Git Product home page Git Product logo

kyu's Introduction

kyu - Simplified Erlang AMQP client

kyu is an AMQP client application that provides a simplified abstraction layer above the official amqp_client. This of course also means that the module isn't suited to handle every possible use case (check out the features section below).

kyu is heavily inspired by the great turtle application. In some areas it provides more functionality over turtle, in others it lacks. The one feature that it definitely lacks is built in rpc support. If you don't want to implement that yourself I suggest taking a look at turtle.

Features

kyu...

  • Can setup multiple AMQP connections and also maintains them (if a connection is lost it will try to reconnect based on the connection's configuration). If the connection is down you will receive errors when making AMQP dependent calls.
  • Uses names for connections, publishers, and consumers, this way you can access them from anywhere in your application. The name can be any Erlang term (except pid()s), but has to be unique per handler type (a connection can have the same name as a publisher but two connections with the same name will conflict).
  • Provides an easy way to setup publishers or publisher pools that can be used to send messages from anywhere in your application.
  • Supports a special publish execution mode that handles returned messages from the AMQP channel and replies with an error to the caller (can be used to make sure the published message was successfully routed to at least one queue).
  • Provides an easy way to setup consumers. Messages are handled by a stateful callback module defined in the consumer's configuration.
  • Provides an interface to communicate with the RabbitMQ management HTTP API.
  • Adds special declaration commands using the RabbitMQ management HTTP API, that can be used to setup publishers and consumers.

Build

kyu can be built using rebar3:

rebar3 compile

Installation

Simply add kyu to your rebar3 dependecies and to the applications list (for example in <yourapp>.app.src).

%% rebar.config
{deps, [
    kyu,
    %% or
    {kyu, "~> 2.0"},
    %% or
    {kyu, {git, "git://github.com/bajankristof/kyu.git"}}
]}
%% <yourapp>.app.src
{application, yourapp, [
    {mod, {yourapp_app, []}},
    {applications, [
        kernel,
        stdlib,
        kyu
    ]}
]}.

Usage

One way to use the application is by providing connection configurations in environment variables (for example in your sys.config or sys.config.src file). This will use kyu's supervisor to start the connections.

Another way is to start your application and then attach connections to your supervision tree using kyu_connection:child_spec/1 (check out the docs).

%% example sys.config
[
    {kyu, [
        {connections, [#{
            name => <<"rabbitmq_cluster">>, %% required
            url => "amqp://user:[email protected]", %% optional
            host => "rabbitmq.cluster", %% optional - default: "localhost"
            port => 5672, %% optional - default: 5672
            username => <<"user">>, %% optional - default: <<"guest">>
            password => <<"password">>, %% optional - default: <<"guest">>
            retry_delay => 5000, %% (ms) optional - default: 1000
            retry_attempts => 99, %% optional - default: infinity
            management_host => "rabbitmq.management", %% optional - default: connection host
            management_port => 443 %% optional - default: 15672
        }]}
    ]}
].

If the retry_attempts option is 0 or below the server will try to connect infinitely.

For the full set of configuration options check out the docs.

Publishers

-include_lib("kyu/include/amqp.hrl"). %% amqp commands

kyu_publisher:child_spec(#{
    connection => <<"rabbitmq_cluster">>, %% required
    name => <<"my_publisher">>, %% required
    confirms => true, %% optional - default: true
    commands => [ %% optional
        #'exchange.declare'{
            exchange = <<"my_exchange">>,
            type = <<"topic">>,
            durable = false
        }
    ]
}).

After introducing the returned child spec to your supervision tree, you can start publishing messages.

Publisher = <<"my_publisher">>,
Message = #{
    routing_key => <<"my.routing.key">>,
    exchange => <<"my_exchange">>,
    payload => <<"hello world">>,
    execution => async %% check out the explanation below
},
kyu:publish(Publisher, Message).

This will try to publish the provided message.

For a full set of possible message properties, check out the docs.

Execution

This setting will tell the publisher how to act on messages:

  • sync (default): If the publisher is in confirm mode, it will make the caller wait for the AMQP server to confirm the publication. This is only synchronous from the perspective of the caller! Other messages can still be published at the same time.
  • async: The publisher will ignore confirmation or returned messages.
  • supervised: The publisher will make the caller wait for either the AMQP server to confirm or return the message and reply with a value based on what was sent back by the server (for example {error, <<"NO_ROUTE">>}. For this to work the publisher must be in confirm mode and the mandatory flag has to be set to true on the message. To be able to identify a returned message, the publisher overrides the message_id property with a custom value! This is only synchronous from the perspective of the caller! Other messages can still be published at the same time.

Consumers

-include_lib("amqp.hrl").
-include_lib("kyu.hrl").

kyu_consumer:child_spec(#{
    connection => <<"rabbitmq_cluster">>, %% required
    name => <<"my_consumer">>, %% required
    queue => <<"queue_to_consume">>, %% required
    module => my_worker, %% required
    args => #{}, %% optional - default: undefined
    prefetch_count => 2, %% required - will start a channel with global prefetch count of 2, and 2 workers with local prefetch count of 1
    commands => [ %% optional
        #'queue.declare'{queue = <<"queue_to_consume">>},
        #'kyu.queue.bind'{ %% this is one of the special commands introduced by kyu
            routing_key = <<"my.routing.key">>,
            exchange = <<"my_exchange">>,
            queue = <<"queue_to_consume">>,
            exclusive = true
        }
    ]
}).

After introducing the consumer to your supervision tree, it will start to consume messages from the provided queue and make calls to the specified worker module.

-module(my_worker).

-behaviour(kyu_worker).

-exports([
    init/1,
    handle_message/2,
    handle_call/3,
    handle_cast/2,
    handle_info/2,
    terminate/2
]).

%% this callback is optional and works much like gen_server:init/1
-spec init(Args :: term()) -> {ok, State :: term()} | {stop, Reason :: term()}.
init(Args) ->
    %% initialization
    {ok, Args}.

-spec handle_message(Message :: kyu:message(), State :: term()) ->
    {ack, NewState :: term()}
    | {reject, NewState :: term()}
    | {remove, NewState :: term()}
    | {stop, Reason :: term(), NewState :: term()}.
handle_message(Message, State) ->
    %% consumption
    {ack, State}.

%% this callback is optional and works like gen_server:handle_call/3
-spec handle_call(Request :: term(), From :: gen_server:from(), State :: term()) ->
    {reply, Reply :: term(), NewState :: term()}
    | {noreply, NewState :: term()}
    | {stop, Reason :: term(), NewState :: term()}.
handle_call(_Request, _From, State) ->
    {noreply, State}.

%% this callback is optional and works like gen_server:handle_cast/2
-spec handle_cast(Request :: term(), State :: term()) ->
    {noreply, NewState :: term()}
    | {stop, Reason :: term(), NewState :: term()}.
handle_cast(_Request, State) ->
    {noreply, State}.

%% this callback is optional and works like gen_server:handle_info/2
-spec handle_info(Info :: term(), State :: term()) ->
    {noreply, NewState :: term()}
    | {stop, Reason :: term(), NewState :: term()}.
handle_info(_Info, State) ->
    {noreply, State}.

%% this callback is optional and works like gen_server:terminate/2
-spec terminate(Reason :: term(), State :: term()) -> term().
terminate(_Reason, _State) -> ok.
  • ack - (as you would expect) acks the message
  • reject - rejects and requeues the message
  • remove - rejects and removes the message from the queue
  • stop - crashes the worker process

Commands

kyu supports commands in publisher and consumer configurations. This allows you to declare exchanges, queues and bindings before publishing or consuming.

AMQP commands

The out of the box supported AMQP commands are:

  • exchange.declare
  • exchange.delete
  • exchange.bind
  • exchange.unbind
  • queue.declare
  • queue.bind
  • queue.purge
  • queue.delete
  • queue.unbind

Use these in a module:

-include_lib("kyu/include/amqp.hrl").

Kyu commands

There are two custom commands at the moment:

%% extends the standard 'queue.bind' command
#'kyu.queue.bind'{
    routing_key :: binary(),
    exchange :: binary(),
    queue :: binary(),
    arguments :: list(),

    exclusive :: boolean()
    %% when set to false the command is equivalent to 
    %% a standard 'queue.bind' command

    %% when set to true it will unbind any other routing key
    %% with the same arguments (above)
}
%% provides an alternative to the standard 'queue.unbind' command
#'kyu.queue.unbind'{
    except = <<>> :: binary(), %% an exception routing key
    pattern = <<>> :: binary(), %% a regex pattern
    %% if a routing key bound to the queue
    %% matches the pattern and the arguments (below)
    %% it will be unbound
    %% (except and pattern may not be used together)

    exchange :: binary(),
    queue :: binary(),
    arguments :: list()
}

Use these in a module:

-include_lib("kyu/include/kyu.hrl").

Documentation

Read the full documentation here.

kyu's People

Contributors

bajankristof avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.