Git Product home page Git Product logo

rabbitmq-amqp1.0-client's Introduction

Erlang AMQP 1.0 client

This repository has been moved to the main unified RabbitMQ "monorepo", including all open issues. You can find the source under /deps/amqp10_client. All issues have been transferred.

Overview

This is an Erlang client for the AMQP 1.0 protocol.

It's primary purpose is to be used in RabbitMQ related projects but it is a generic client that was tested with at least 4 implementations of AMQP 1.0.

If you are looking for an Erlang client for AMQP 0-9-1 โ€” a completely different protocol despite the name โ€” consider this one.

Project Maturity and Status

This client is used in the cross-protocol version of the RabbitMQ Shovel plugin. It is not 100% feature complete but moderately mature and was tested against at least three AMQP 1.0 servers: RabbitMQ, Azure ServiceBus, ActiveMQ.

This client library is not officially supported by VMware at this time.

Usage

Connection Settings

The connection_config map contains various configuration properties.

-type address :: inet:socket_address() | inet:hostname().

-type connection_config() ::
    #{container_id => binary(), % mandatory
      %% must provide a list of addresses or a single address
      addresses => [address()],
      address => address(), 
      %% defaults to 5672, mandatory for TLS
      port => inet:port_number(),
      % the dns name of the target host
      % required by some vendors such as Azure ServiceBus
      hostname => binary(),
      tls_opts => {secure_port, [ssl:ssl_option()]}, % optional
      notify => pid(), % Pid to receive protocol notifications. Set to self() if not provided
      max_frame_size => non_neg_integer(), % incoming max frame size
      idle_time_out => non_neg_integer(), % heartbeat
      sasl => none | anon | {plain, User :: binary(), Password :: binary(),
      % set this to a negative value to allow a sender to "overshoot" the flow
      % control by this margin
      transfer_limit_margin => 0 | neg_integer()}
  }.

TLS

TLS is enabled by setting the tls_opts connection configuration property. Currently the only valid value is {secure_port, [ssl_option]} where the port specified only accepts TLS. It is possible that tls negotiation as described in the amqp 1.0 protocol will be supported in the future. If no value is provided for tls_opt then a plain socket will be used.

Basic Example

%% this will connect to a localhost node
{ok, Hostname} = inet:gethostname(),
User = <<"guest">>,
Password = <<"guest">>,
%% create a configuration map
OpnConf = #{address => Hostname,
            port => Port,
            container_id => <<"test-container">>,
            sasl => {plain, User, Password}},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
SenderLinkName = <<"test-sender">>,
{ok, Sender} = amqp10_client:attach_sender_link(Session, SenderLinkName, <<"a-queue-maybe">>),

%% wait for credit to be received
receive
    {amqp10_event, {link, Sender, credited}} -> ok
after 2000 ->
      exit(credited_timeout)
end.

%% create a new message using a delivery-tag, body and indicate
%% it's settlement status (true meaning no disposition confirmation
%% will be sent by the receiver).
OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true),
ok = amqp10_client:send_msg(Sender, OutMsg),
ok = amqp10_client:detach_link(Sender),

%% create a receiver link
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver">>, <<"a-queue-maybe">>),

%% grant some credit to the remote sender but don't auto-renew it
ok = amqp10_client:flow_link_credit(Receiver, 5, never),

%% wait for a delivery
receive
    {amqp10_msg, Receiver, InMsg} -> ok
after 2000 ->
      exit(delivery_timeout)
end.

ok = amqp10_client:close_connection(Connection),

Events

The ampq10_client API is mostly asynchronous with respect to the AMQP 1.0 protocol. Functions such as amqp10_client:open_connection typically return after the Open frame has been successfully written to the socket rather than waiting until the remote end returns with their Open frame. The client will notify the caller of various internal/async events using amqp10_event messages. In the example above when the remote replies with their Open frame a message is sent of the following forma:

{amqp10_event, {connection, ConnectionPid, opened}}

When the connection is closed an event is issued as such:

{amqp10_event, {connection, ConnectionPid, {closed, Why}}}

Why could be normal or contain a description of an error that occured and resulted in the closure of the connection.

Likewise sessions and links have similar events using a similar format.

%% success events
{amqp10_event, {connection, ConnectionPid, opened}}
{amqp10_event, {session, SessionPid, begun}}
{amqp10_event, {link, LinkRef, attached}}
%% error events
{amqp10_event, {connection, ConnectionPid, {closed, Why}}}
{amqp10_event, {session, SessionPid, {ended, Why}}}
{amqp10_event, {link, LinkRef, {detached, Why}}}

In addition the client may notify the initiator of certain protocol events such as a receiver running out of credit or credit being available to a sender.

%% no more credit available to sender
{amqp10_event, {link, Sender, credit_exhausted}}
%% sender credit received
{amqp10_event, {link, Sender, credited}}

Other events may be declared as necessary, Hence it makes sense for a user of the client to handle all {amqp10_event, _} events to ensure unexpected messages aren't kept around in the mailbox.

rabbitmq-amqp1.0-client's People

Contributors

acogoluegnes avatar antoinegagne avatar bougueil avatar dcorbacho avatar dumbbell avatar gerhard avatar hommeabeil avatar kjnilsson avatar lukebakken avatar michaelklishin avatar patrickdet avatar pjk25 avatar sircinek avatar spring-operator avatar tstorck avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rabbitmq-amqp1.0-client's Issues

Continuation frames without delivery_id are not handled correctly.

Hi,

I am using this rabbitmq-amqp1.0-client in an elixir project to read AMQP messages from an Azure Event Hub.
Yesterday, there appeared some broken message in the queue and the amqp-client did break during handling it. The message looks something like this: "":null,"CorrelationId":null,..." and the amqp-client reacts with the warning log "Unhandled session frame {{:"v1_0.transfer", {:uint, 0}, :undefined, :undefined, :undefined, :undefined, false, :undefined, :undefined, :undefined, :undefined, :undefined},", followed by the frame and "in state {:state, 1, 1, 30588, 65535, 1, 65535, 5000, 34948, #PID<0.5502.0>,".
Afterwards it terminates with "** State machine #PID<0.5511.0> terminating
** Last event in was {{:"v1_0.transfer", {:uint, 0}, {:uint, 30586}, {:binary, ""}, {:uint, 0}, true,
false, :undefined, :undefined, :undefined, :undefined, true},
<<0, 83, 114, 193, 90, 6, 163, 21, 120, 45, 111, 112, 116, 45, 115, 101, 113,
117, 101, 110, 99, 101, 45, 110, 117, 109, 98, 101, 114, 129, 0, 0, 0, 0, 0,
12, 20, 56, 163, 12, 120, 45, 111, 112, 116, 45, 111, 102, 102, ...>>}
** When State == :mapped
** Data == {:state, 1, 1, 30588, 65535, 1, 65535, 5000, 34948, #PID<0.5502.0>,
{:ssl,
{:sslsocket, {:gen_tcp, #Port<0.158324>, :tls_connection, :undefined},
#PID<0.5503.0>}},
%{0 => {:link, "receiver", {:link_ref, :receiver, #PID<0.5511.0>, 0},
:attached, #PID<0.5499.0>, 0, 0, :receiver, :undefined,
{:pid, #PID<0.5499.0>}, 30585, 415, 415, 0, false,
{{:"v1_0.transfer", {:uint, 0}, {:uint, 30585}, {:binary, ""}, {:uint, 0},
true, true, :undefined, :undefined, :undefined, :undefined, true}," followed by the frame again and something like this: "<<0, 83, 114, 193, 90, 6, 163, 21, 120, 45, 111, 112, 116, 45, 115, 101,
113, 117, 101, 110, ...>>]}, :never}}, %{}, %{0 => 0}, 1, [],
%{address: 'xxx', container_id: "xxx",
hostname: "xxx", notify: #PID<0.5499.0>,
outgoing_max_frame_size: 65536, port: 5671,
sasl: {:plain, "xxx", "xxx"},
tls_opts: {:secure_port, []}, transfer_limit_margin: 0}, %{}, %{},
#PID<0.5499.0>}
** Reason for termination =
** {:bad_return_value, {:primitive_type_unsupported, 176}}"

I did not find a way to react to this error from within my wrapping code.

Could you please help me figure out what exactly is going wrong and what to do about it? Thanks a lot in advance!

Consider making sasl=plain implicit in URI

When a username and password is present in the AMQP Uri we could make the requirement for the query parameter "sasl=plain" implicit I.e. whenever there is user info present we assume sasl=plain. Then if we ever support other sasl mechanisms they would need an explicit sasl parameter.

How to set the node properties?

Hello. I use the client to work with activeMQ.
But I can't set node properties for delete queue without any link of consumer.
As I understand, I should set it in "lifetime-policy" in field "dynamic-node-properties" of source when I'm attaching a receiver.
But I can't understand how to do it.
Help me please. How to set it correctly?

consumer

Is the amqp0.9 consumer equal to the receiver here?

Migrate to gen_statem

gen_fsm is deprecated - it should be migrated to gen_statem. All modules that currently use gen_fsm have to be migrated at once before we can re-enable all warnings in this repo.

Connecting to Azure Service Bus

Hi. Firstly, I'm very new to Erlang in general but keen to learn. I know Azure pretty well but AMQP is new on me. I have been working on this for a couple of days and learned a lot.

I have created a new Erlang application and used your test code to try and push a message and receive it. I am now able to connect and authenticate but no matter what I do, cannot get a message posted - i get no error but i also see no message on the bus. If i try from Python i can do it.

In addition, the receive section below throws a "no match of right hand side value {error,timeout}" and i can't get past that ... but i'd like to just get a message in to start with.

The code in the README didn't work so i tried the test code and given it is from May it's not that old so should be ok i'd have thought.

Anyone have any ideas? Is this my code error, a specific config i need to add or an issue with the library?

Appreciate all the work - and any advice.

%%%-------------------------------------------------------------------
%% @doc myapp public API
%% @end
%%%-------------------------------------------------------------------

-module(myapp_app).

-behaviour(application).

%% Application callbacks
-export([start/2, stop/1, connect/0]).

%%====================================================================
%% API
%%====================================================================

%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called whenever an application is started using
%% application:start/[1,2], and should start the processes of the
%% application. If the application is structured according to the OTP
%% design principles as a supervision tree, this means starting the
%% top supervisor of the tree.
%%
%% @spec start(StartType, StartArgs) -> {ok, Pid} |
%%                                      {ok, Pid, State} |
%%                                      {error, Reason}
%%      StartType = normal | {takeover, Node} | {failover, Node}
%%      StartArgs = term()
%% @end
%%--------------------------------------------------------------------
start(_StartType, _StartArgs) ->
  io:format("starting~n"),
  application:start(ssl),
  application:ensure_all_started(amqp10_client),
  myapp_sup:start_link().

%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called whenever an application has stopped. It
%% is intended to be the opposite of Module:start/2 and should do
%% any necessary cleaning up. The return value is ignored.
%%
%% @spec stop(State) -> void()
%% @end
%%--------------------------------------------------------------------
stop(_State) ->
  ok.

connect() ->
  io:format("Connecting~n"),
  basic_roundtrip_service_bus(),
  ok.

basic_roundtrip_service_bus() ->
  Hostname = "myazurebroker.servicebus.windows.net",
  Port = 5671,
  User = "RootManageSharedAccessKey",
  Password = "XXXXXXXXXXXXXXXXXXXXXX=",
  OpnConf = #{
    address => Hostname,
    hostname => to_bin(Hostname),
    port => Port,
    notify => self(),
    tls_opts => {secure_port, [{versions, ['tlsv1.1']}]},
    container_id =>  <<"test">>,
    sasl => {plain, to_bin(User), to_bin(Password)}
  },
  roundtrip(OpnConf).

roundtrip(OpenConf) ->
  {ok, Connection} = amqp10_client:open_connection(OpenConf),
  {ok, Session} = amqp10_client:begin_session(Connection),
  {ok, Sender} = amqp10_client:attach_sender_link(Session,
    <<"test-sender">>,
    <<"test">>,
    settled),
  await_link(Sender, credited, link_credit_timeout),

  Now = os:system_time(millisecond),
  Props = #{creation_time => Now},
  Msg0 =  amqp10_msg:set_properties(Props,
    amqp10_msg:new(<<"my-tag">>, <<"banana">>,
      true)),
  Msg1 = amqp10_msg:set_application_properties(#{"a_key" => "a_value"}, Msg0),
  Msg = amqp10_msg:set_message_annotations(#{<<"x_key">> => "x_value"}, Msg1),
  % RabbitMQ AMQP 1.0 does not yet support delivery annotations
  % Msg = amqp10_msg:set_delivery_annotations(#{<<"x_key">> => "x_value"}, Msg2),
  ok = amqp10_client:send_msg(Sender, Msg),
  ok = amqp10_client:detach_link(Sender),
  await_link(Sender, {detached, normal}, link_detach_timeout),

  {error, link_not_found} = amqp10_client:detach_link(Sender),
  {ok, Receiver} = amqp10_client:attach_receiver_link(Session,
    <<"banana-receiver">>,
    <<"test">>,
    settled),
  {ok, OutMsg} = amqp10_client:get_msg(Receiver),
  ok = amqp10_client:end_session(Session),
  ok = amqp10_client:close_connection(Connection),
  #{creation_time := Now} = amqp10_msg:properties(OutMsg),
  #{<<"a_key">> := <<"a_value">>} = amqp10_msg:application_properties(OutMsg),
  #{<<"x_key">> := <<"x_value">>} = amqp10_msg:message_annotations(OutMsg),
  ok.


%%====================================================================
%% Internal functions
%====================================================================

await_link(Who, What, Err) ->
  receive
    {amqp10_event, {link, Who, What}} ->
      ok;
    {amqp10_event, {link, Who, {detached, Why}}} ->
      exit(Why)
  after 5000 -> exit(Err)
  end.

to_bin(X) when is_list(X) ->
  list_to_binary(X).

xxxx

Consider defaulting hostname to address.

Azure ServiceBus require the hostname property to be set in the connection configuration. Other AMQP 1.0 messaging systems may not require this but we should consider setting hostname to address whenever it is not supplied by the user.

amqp10_binary_generator not exist

exception exit: {function_clause,
[{amqp10_binary_generator,generate,"a",
[{file,"src/amqp10_binary_generator.erl"},
{line,86}]},

Error when processing atoms

As reported here:

{function_clause,
    [{amqp10_msg,wrap_ap_value,
         [false],
         [{file,"src/amqp10_msg.erl"},{line,414}]},
     {amqp10_msg,'-set_application_properties/2-fun-0-',3,
         [{file,"src/amqp10_msg.erl"},{line,375}]},
     {maps,fold_1,3,[{file,"maps.erl"},{line,232}]},
     {amqp10_msg,set_application_properties,2,
         [{file,"src/amqp10_msg.erl"},{line,374}]},
     {maps,fold_1,3,[{file,"maps.erl"},{line,232}]},
     {rabbit_amqp10_shovel,forward,4,
         [{file,"src/rabbit_amqp10_shovel.erl"},{line,324}]},
     {rabbit_shovel_worker,handle_info,2,
         [{file,"src/rabbit_shovel_worker.erl"},{line,101}]},
     {gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1067}]}]}

This code needs to take atoms into account.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.