Git Product home page Git Product logo

rhea's Introduction

Node.js CI

rhea

A reactive library for the AMQP protocol, for easy development of both clients and servers.

Hello World!

Brief example of sending and receiving a message through a broker/server listening on port 5672:

var container = require('rhea');
container.on('message', function (context) {
    console.log(context.message.body);
    context.connection.close();
});
container.once('sendable', function (context) {
    context.sender.send({body:'Hello World!'});
});
var connection = container.connect({'port':5672});
connection.open_receiver('examples');
connection.open_sender('examples');

output:

Hello World!

Dependencies

  • debug (For simple debug logging - may be replaced in the near term. To enable set e.g. DEBUG=rhea* or DEBUG=rhea:events for more qualified debugging)

Examples

There are some examples of using the library under the examples folder. These include:

  • helloworld.js - essentially the code above, which sends and receives a single message through a broker

  • direct_helloworld.js - an example showing the sending of a single message without the use of a broker, by listening on a port and then openning a connection to itself over which the message is transfered.

  • send_raw.js - explicitly set the data section of the message body

  • simple_send.js - connects to a specified port then sends a number of messages to a given address

  • simple_recv.js - connects to a specified port then subscribes to receive a number of messages from a given address

These last two can be used together to demsontrate sending messages from one process to another, using a broker or similar intermediary to which they both connect.

  • direct_recv.js - listens on a given port for incoming connections over which it will then receive a number of messages

The direct_recv.js example can be used in conjunction with the simple_send.js example to demonstrate sending messages between processes without the use of any intermediary. Note however the the default port of one or ther other will need to be changed through the -p command line option.

  • client.js and server.js

    • A request-response example where the 'client' sends messages to a 'server' (or service) which converts them to upper case and sends them back. This demonstrates the use of temporary addresses among other things. Using these two together requires a broker or similar intermediary.
  • In durable_subscription, a subscriber and a publisherwhich demonstrate the notion of a durable subscription when used in conjunction with a broker such as ActiveMQ

  • In selector a receiver that uses a selector - a SQL like query string that restricts the set of messages delivered - and an accompanying sender

  • In sasl a sasl client showing how to authenticate to the service you connect to. This can be used against any broker as well as either of two example servers showing anonymous and plain mechanisms.

  • A tls client and server demonstrating connecting (and possibly authenticating) over a tls secured socket.

  • A client to demonstrate the built in automatic reconnection functionality along with a simple echo server against which it can be run. It can of course also be run against a broker instead (or as well!).

  • Both node based and web based websocket clients along with a server which will echo back any requests received. The clients can also be used against a websocket enabled AMQP broker with a queue or topic called 'examples'. The node based scritps require the 'ws' node module to be installed. The browser based example requires a browserified version of the rhea library (this can be created e.g. by calling npm run-script browserify or make browserify). The browserified and minimized javascript library is stored under the dist/ directory.

To run the examples you will need the dependencies installed: the library itself depends on the 'debug' module, and some of the examples depend on the 'yargs' module for command line option parsing.

The 'rhea' module itself must also be findable by node. You can do this either by checking out the code from git and setting NODE_PATH to include the directory to which you do so (i.e. the directory in which 'a directory named 'rhea' can be found, or you can install the module using npm.

Some of the examples assume an AMQP compatible broker, such as those offered by the ActiveMQ or Qpid Apache projects, is running.

API

There are five core types of object in the API:

Each of these inherits all the methods of EventEmitter, allowing handlers for particular events to be attached. Events that are not handled at sender or receiver scope are then propagated up to possibly be handled at session scope. Events that are not handled at session scope are then propagated up to possibly be handled at connection scope, and if not there then in container scope.

Two other relevant objects are:


Container

An AMQP container from which outgoing connections can be made and/or to which incoming connections can be accepted. The module exports a default instance of a Container which can be used directly. Other instances can be created from that if needed using the create_container method. A container is identified by the id property. By default a uuid is used, but the property can be set to something more specific if desired before making or accepting any connections.

methods:

connect(options)

Connects to the server specified by the host and port supplied in the options and returns a Connection.

The options argument is an object that may contain node library socket.connect and tls.connect options and any of the following fields:

  • host - socket.connect option, defaults to localhost
  • port - socket.connect option, defaults to 5672
  • transport - undefined, 'tcp' or 'tls', determines if socket.connect or tls.connect options are accepted
  • username
  • password
  • sasl_init_hostname
  • reconnect
    • if true (the default), the library will automatically attempt to reconnect if disconnected
    • if false, automatic reconnect will be disabled
    • if it is a numeric value, it is interpreted as the delay between reconnect attempts (in milliseconds) When enabled, reconnect can be further controlled via the following options:
    • initial_reconnect_delay (in milliseconds)
    • max_reconnect_delay (in milliseconds)
    • reconnect_limit (maximum number of reconnect attempts)
  • connection_details - a function which if specified will be invoked to get the options to use (e.g. this can be used to alternate between a set of different host/port combinations)

As well as Container options common for both client and server:

  • id - connection name
  • container_id - (overrides the container identifier)
  • hostname - to present to remote in the open frame (defaults to host)
  • max_frame_size
  • channel_max
  • idle_time_out
  • outgoing_locales - in open frame
  • incoming_locales - in open frame
  • offered_capabilities - in open frame
  • desired_capabilities - in open frame
  • properties - in open frame
  • sender_options - defaults for open_sender
  • receiver_options - defaults for open_receiver
  • non_fatal_errors - an array of error conditions which if received on connection close from peer should not prevent reconnect (by default this only includes amqp:connection:forced)
  • all_errors_non_fatal - a boolean which determines if rhea's auto-reconnect should attempt reconnection on all fatal errors

If options is undefined, the client will attempt to obtain default options from a JSON config file. This file is of similar structure to that used by Apache Qpid Proton clients. The location of the file can be specified through the MESSAGING_CONNECT_FILE environment variable. If that is not specified it will look for a file called connect.json in the current directory, in /.config/messaging or /etc/messaging/.

The config file offers only limited configurability, specifically:

  • scheme
  • host
  • port
  • user - (note not username)
  • password
  • sasl - (a nested object with field enabled)
  • sasl_mechanisms
  • tls - (a nested object with fields key, cert, ca for paths to correspoding files)
  • verify
listen(options)

Starts a server socket listening for incoming connections on the port (and optionally interface) specified in the options.

The options argument is an object that may contain node library net.createServer and its server.listen or tls.createServer and its server.listen options, most AMQP Container fields listed for connect and any of the following fields:

The options argument is an object that may contain any of the following fields:

  • transport - undefined, 'tcp' or 'tls', determines if net.createServer or tls.createServer options are accepted
  • host
  • port
create_container()

Returns a new container instance. The method takes an options object which can contain the following field:

  • id

If no id is specified a new uuid will be generated.

generate_uuid()

Simple utility for generating a stringified uuid, useful if you wish to specify distinct container ids for different connections.

websocket_connect()

Returns a function that can be used to create another function suitable for use as the value of 'connection_details' in a connect call in order to connect over websockets. The function returned here takes a websocket url and optional arguments. The websocket_connect method itself take the constructor of the WebSocket implementation to use. It has been tested with the implementation in firefox and also that in the node module 'ws'.

websocket_accept()

Used to start handling an incoming websocket connection as an AMQP connection. See the websocket echo server example for how to use it.


Connection

methods:

open_receiver(address|options)

Establishes a link over which messages can be received and returns a Receiver representing that link. A receiving link is a subscription, i.e. it expresses a desire to receive messages.

The argument to this method can either be a simple string indicating the source of messages of interest (e.g. a queue name), or an options object that may contain any of the following fields:

  • source - The source from which messages are received. This can be a simple string address/name or a nested object itself containing the fields:
    • address
    • dynamic
    • expiry_policy
    • durable
  • target - The target of a receiving link is the local identifier. It is often not needed, but can be set if it is,
  • name - The name of the link. This should be unique for the container. If not specified a unqiue name is generated.
  • credit_window - A 'prefetch' window controlling the flow of messages over this receiver. Defaults to 500 if not specified. A value of 0 can be used to turn of automatic flow control and manage it directly.
  • autoaccept - Whether received messages should be automatically accepted. Defaults to true. If set to false, the application should call accept, release or reject on the delivery field of the context passed to the message event.
  • autosettle - Whether received messages should be automatically settled once the remote settles them. Defaults to true.

And attach frame fields:

  • snd_settle_mode
  • rcv_settle_mode
  • unsettled
  • max_message_size
  • offered_capabilities
  • desired_capabilities
  • properties

Note: If the link doesn't specify a value for the credit_window and autoaccept options, the connection options are consulted followed by the container options. The default is used only if an option is not specified at any level.

open_sender(address|options)

Establishes a link over which messages can be sent and returns a Sender representing that link. A sending link is an analogous concept to a subscription for outgoing rather than incoming messages. I.e. it expresses a desire to send messages.

The argument to this method can either be a simple string indicating the target for messages of interest (e.g. a queue name), or an options object that may contain any of the following fields:

  • target - The target to which messages are sent. This can be a simple string address/name or a nested object itself containing the fields:
    • address
    • dynamic
    • expiry_policy
    • durable
  • source - The source of a sending link is the local identifier. It is usually not needed, but can be set if it is,
  • name - The name of the link. This should be unique for the container. If not specified a unqiue name is generated.
  • autosettle - Whether sent messages should be automatically settled once the peer settles them. Defaults to true.

And attach frame fields as for open_receiver.

Note: If the link doesn't specify a value for the autosettle option, the connection options are consulted followed by the container options. The default is used only if an option is not specified at any level.

send(message)

Sends the specified message over the default sender, which is a sending link whose target address is null. The use of this method depends on the peer supporting so-called 'anonymous relay' semantics, which most AMQP 1.0 brokers do. The message should have the 'to' field set to the intended destination.

close()

Closes a connection (may take an error object which is an object that consists of condition and description fields).

is_open()/is_closed()

Provide information about the connection status. If it's opened or closed.

create_session()

Creates a new session if you want to manage sessions by yourself.

events:

connection_open

Raised when the remote peer indicates the connection is open. This occurs also on reconnect.

connection_close

Raised when the remote peer indicates the connection is closed. This can happen either as a response to our close, or by itself. The connection and sessions will not be reconnected.

connection_error

Raised when the remote peer indicates the connection is closed and specifies an error. A connection_close event will always follow this event, so it only needs to be implemented if there are specific actions to be taken on a close with an error as opposed to a close. The error is available as a property on the event context.

If neither the connection_error or the connection_close is handled by the application, an error event will be raised. This can be handled on the connection or the container. If this is also unhandled, the application process will exit.

protocol_error

Raised when a protocol error is received on the underlying socket. A disconnected event will follow with any reconnect as configured.

error

Raised when an error is received on the underlying socket. This catches any errors otherwise not handled.

disconnected

Raised when the underlying tcp connection is lost or nonfatal error was received. The context has a reconnecting property which is true if the library is attempting to automatically reconnect and false if it has reached the reconnect limit. If reconnect has not been enabled or if the connection is a tcp server, then the reconnecting property is undefined. The context may also have an error property giving some information about the reason for the disconnect. If the disconnect event is not handled, a warning will be logged to the console.

You should update the application state to resend any unsettled messages again once the connection is recovered.

settled

Raised when remote settled the message.


Session

Session is an aggregation of Receiver and Sender links and provides the context and sequencing of messages for all the links it contains. A Connection creates a default session for you if you create receivers and senders on the Connection. You only need to use this object if you want to group your links into more than one session.

methods:

open_receiver(address|options)

This adds a receiver on the session. The open_receiver on the Connection object finds the session and calls this.

open_sender(address|options)

This adds a sender on the session. The open_sender on the Connection object finds the session and calls this.

close()

End a session (may take an error object which is an object that consists of condition and description fields).

is_open()/is_closed()

Provide information about the session status. If it's opened or closed.

events:

session_open

Raised when the remote peer indicates the session is open (i.e. begun in AMQP parlance).

session_close

Raised when the remote peer indicates the session is closed (i.e. ended in AMQP parlance). The session will be removed from the connection after the event.

session_error

Raised when the remote peer indicates the session has ended and specifies an error. A session_close event will always follow this event, so it only needs to be implemented if there are specific actions to be taken on a close with an error as opposed to a close. The error is available as error property on the session object.

If neither the session_error or the session_close is handled by the application, an error event will be raised on the container. If this is also unhandled, the application process will exit.

settled

Raised when remote settled the message.


Receiver

methods:

close()

Closes a receiving link (i.e. cancels the subscription). (May take an error object which is an object that consists of condition and description fields).

detach()

Detaches a link without closing it. For durable subscriptions this means the subscription is inactive, but not cancelled.

add_credit(n)

By default, receivers have a prefetch window that is moved automatically by the library. However if desired the application can set the prefecth to zero and manage credit itself. Each invocation of add_credit() method issues credit for a further 'n' messages to be sent by the peer over this receiving link. [Note: flow()is an alias for add_credit()]

credit()

Returns the amount of outstanding credit that has been issued.

events:

message event

Raised when a message is received. The context passed will have a message, containing the received content, and a delivery which can be used to acknowledge receipt of the message if autoaccept has been disabled.

receiver_open

Raised when the remote peer indicates the link is open (i.e. attached in AMQP parlance).

receiver_drained

Raised when the remote peer indicates that it has drained all credit (and therefore there are no more messages at present that it can send).

receiver_flow

Raised when a flow is received for receiver.

receiver_error

Raised when the remote peer closes the receiver with an error. A receiver_close event will always follow this event, so it only needs to be implemented if there are specific actions to be taken on a close with an error as opposed to a close. The error is available as an error property on the receiver.

receiver_close

Raised when the remote peer indicates the link is closed (i.e. detached in AMQP parlance).

settled

Raised when remote settled the message.


Sender

methods:

send(msg)

Sends a message. The link need not be yet open nor is any credit needed, but there is a limit of 2048 deliveries in the Session queue before it raises an exception for buffer overflow.

Unsettled messages, whether transmitted or not, are lost on reconnect and there will be no accepted, released, rejected events. You may need to resend the messages on a disconnected event.

If the messages to be sent can be generated or fetched on demand or there is large number of messages, it is recommended send is called only while the sender is sendable(). When sender is no longer sendable, continue sending in the sendable event.

close()

Closes a sending link (may take an error object which is an object that consists of condition and description fields).

detach()

Detaches a link without closing it.

sendable()

Returns true if the sender has available credits for sending a message. Otherwise it returns false.

set_drained(bool)

This must be called in response to sender_draining event to tell peer we have drained our messages or credit.

events:

sendable

Raised when the sender has received credit to be able to transmit messages to its peer. You will not receive a new event until the peer sends more credit, even if you have some credit left.

accepted

Raised when a sent message is accepted by the peer.

released

Raised when a sent message is released by the peer.

rejected

Raised when a sent message is rejected by the peer. context.delivery.remote_state.error may carry diagnostics to explain rejection, for example a condition property with value amqp:unauthorized-access.

modified

Raised when a sent message is modified by the peer. The context.delivery.remote_state may have delivery_failed and undeliverable_here boolean and message_annotations map properties to guide any message retransmission as specified in the AMQP 1.0 specification.

sender_open

Raised when the remote peer indicates the link is open (i.e. attached in AMQP parlance).

sender_draining

Raised when the remote peer requests that the sender drain its credit; sending all available messages within the credit limit and calling set_drained(true). After this the sender has no credit left.

sender_flow

Raised when a flow is received for sender. sender_draining and sendable events may follow this event, so it only needs to be implemented if there are specific actions to be taken.

sender_error

Raised when the remote peer closes the sender with an error. A sender_close event will always follow this event, so it only needs to be implemented if there are specific actions to be taken on a close with an error as opposed to a close. The error is available as an error property on the sender.

sender_close

Raised when the remote peer indicates the link is closed (i.e. detached in AMQP parlance).

settled

Raised when remote settled the message.

Message

A message is an object that may contain the following fields:

  • durable
  • priority
  • ttl
  • first_acquirer
  • delivery_count
  • delivery_annotations, an object/map of non-standard delivery annotations sent to link recipient peer that should be negotiated at link attach
  • message_annotations, an object/map of non-standard delivery annotations propagated across all steps that should be negotiated at link attach
  • message_id
  • user_id
  • to
  • subject
  • reply_to
  • correlation_id
  • content_type
  • content_encoding
  • absolute_expiry_time
  • creation_time
  • group_id
  • group_sequence
  • reply_to_group_id
  • application_properties, an object/map which can take arbitrary, application defined named simple values
  • body, which can be of any AMQP type type or data_section, data_sections, sequence_section or sequence_sections from rhea.message.
  • footer, an objec`t/map for HMACs or signatures or similar

Messages are passed to the send() method of Connection or Sender, and are made available as message on the event context for the message event on a Receiver or its parent(s).

Delivery

The delivery object provides information on- and enables control over- the state of a message transfer.

The methods on a delivery object are:

  • accept, which will positively acknowledge the receipt of the message
  • release, which will inform the sender that the message can be redelivered (to this or to any other receiver). The release can be controlled through an object passed in with one or more fo the following fields:
    • delivery_failed, if true the sender should increment the delivery_count on the next redelivery attempt, if false it should not
    • undeliverable_here, if true the sender should not try to redeliver the same message to this receiver
  • reject, which will inform the sender that the message is invalid in some way.
  • modified, which sets the modified outcome as defined in the AMQP 1.0 specification.

If autoaccept is disabled on a receiver, the application should ensure that it accepts (or releases or rejects) all messages received.


Note: For detailed options and types, please refer to the type definitions in the typings directory.

rhea's People

Contributors

amarzavery avatar bartoval avatar benblack86 avatar chradek avatar ernieallen avatar gaikwadpratik avatar gautric avatar gemmellr avatar georgejcleary avatar grs avatar harshanalluru avatar iamdanthedev avatar jiridanek avatar jmealo avatar k-wall avatar kohtala avatar kornys avatar kyledavelaar avatar libmw avatar marioarnt avatar mzack5020 avatar ppatierno avatar ps2goat avatar ramya-rao-a avatar ramya0820 avatar richardpark-msft avatar rkubis avatar shivangireja avatar ssorj avatar theonlystylar avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rhea's Issues

default properties for senders and receivers...

Converting some code to utilize rhea.

With our other amqp transport we could specify properties on the amqp client constructor for default properties on the attach for sender and receiver links:

senderLink: {
attach: {
properties: {
'client-version': '1.0'
},
maxMessageSize: 0,
},

So that whenever we created a actual server link it would have at least the 'client-version' property. When actually creating a sender link later on, we could optionally send further properties so that the link would have on the attach

properties: {
'client-version': '1.0',
'api-version': '2018',
'channel-correlation': 'abc'
}

Is there a straight forward way of setting up the initial default property prior to actually creating the senderlink?

Thanks,
Tony

channel always 0. Should it increment?

I create a container.
I then create a connection under that container.
After I have the connection object I then do a mySession = myConnection.create_session();
mySession.open();

I then do an mySession.open_sender({target: '$cbs'});

I get the following logs:

rhea:frames [connection-1] PENDING: '{"channel":0,"type":0,"performative":["9ebe2971-4f1d-0c40-9a17-2fcb742eaf61",0,false,null,null,[],["$cbs"],null,null,0,null,null,null,["com.microsoft:client-version","azure-iot-device/1.4.1"]]}' +2s
rhea:raw [connection-1] SENT: <Buffer 00 00 00 99 02 00 00 00 00 53 12 d0 00 00 00 89 00 00 00 0e a1 24 39 65 62 65 32 39 37 31 2d 34 66 31 64 2d 30 63 34 30 2d 39 61 31 37 2d 32 66 63 62 ... > +2s
rhea:io [connection-1] read 162 bytes +2s
rhea:io [connection-1] got frame of size 162 +1ms
rhea:frames [connection-1] RECV: '{"size":162,"type":0,"channel":0,"performative":{"type":"attach#12","name":"9ebe2971-4f1d-0c40-9a17-2fcb742eaf61","role":true,"source":[null,null,null,null,null,null,null,null,null,null,null],"target":["$cbs",null,null,null,null,null,null],"max_message_size":1048576,"properties":{"com.microsoft:client-version":"azure-iot-device/1.4.1"}}}' +40ms

Shouldn't the creation of the session have caused a new channel to be allocated? Shouldn't the performatives associated with the sender link be using a new channel number?

I am trying to track a problem down. I am comparing the differences in traffic between rhea and amqp10.

When doing the same sequence in amqp10 these new links utilize channel 1.

encoding of message properties of type number

I'm sending messages that should have a property that is an http-ish status code (200, 404...)

here's my message object (that I pass "as is" to sender.send()):

{
  correlation_id: context.message.correlation_id,
  body: {
    'key': 'value'
  },
  application_properties: {
    'status': 200
  }
}

it looks like on the service side the property is either not received, or not received as an Int32 - am i doing anything wrong? is the number type coerced to a smaller integer type? can I force it to Int32?

[edit] typos

A-MQ 6.2 GA broker logs Java EOF on simple_sender.js connection attempt

Description:
A-MQ 6.2 GA broker logs Java EOF on simple_sender.js connection attempt

Actual results:
2015-10-21 16:19:09,407 | WARN | 0.0.1:36674@5672 | Transport | ivemq.broker.TransportConnection 245 | 171 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-620133 | Transport Connection to: tcp://127.0.0.1:36674 failed: java.io.EOFException

Expected results:
Successful connection made to the broker

Steps to reproduce:

  • start JBoss A-MQ 6.2 GA broker listening on amqp://127.0.0.1:5672
  • node rhea/examples/simple_sender.js
  • inspect broker log $inst_root/data/log/amq.log or log:display in Karaf console

uuid type support for correlation_id property

Hi, i'm trying to figure out a way to "force" the library to decode the correlation_id property of a message I'm receiving as a UUID. Right now i only get a Buffer (UInt8Array) - i'm trying to make my way through the type system but i get a bit lost in there.... any pointers or suggestions?

Thanks!

[edit] I'm able to "read" the uuid from the buffer by using this package: https://www.npmjs.com/package/uuid-buffer
but I also need to send it "re-encoded" as a uuid - and in that case it looks like I fall into this "todo" item

Feels like either one would need a way to "force" the type of the property OR you'd need to "detect" uuids from strings or buffers and encode them with the proper type (might be expensive).

thoughts?

Receiver should not lose messages in between the request to close the link has been fired and the link actually gets closed.

Scenario:

The receiver is busy receiving messages and some time later it decides that it wants to close itself.
With debug logs on, what i noticed was: Between closing the receiver and the close actually happening, close to ~1000 messages were received by rhea but were never sent to my app.

Here are the debug logs..

>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Closing receiver.
  rhea:frames [connection-1] PENDING: '{"channel":1,"type":0,"performative":[0,true]}' +12ms
  rhea:raw [connection-1] SENT: <Buffer 00 00 00 16 02 00 00 01 00 53 16 d0 00 00 00 06 00 00 00 02 43 41> +12ms
  rhea:io [connection-1] read 16355 bytes +94ms
  rhea:io [connection-1] got frame of size 562 +0ms
  rhea:frames [connection-1] RECV: '{"size":562,"type":0,"channel":1,"performative":{"type":"transfer#14","delivery_id":2022,"delivery_tag":{"type":"Buffer","data":[]},"settled":true,"batchable":true},"payload":{"type":"Buffer","data":[0,83,113,193,150,8,163,29,108,97,115,116,95,101,110,113,117,101,117,101,100,95,115,101,113,117,101,110,99,101,95,110,117,109,98,101,114,129,0,0,0,0,0,4,138,110,163,20,108,97,115,116,95,101,110,113,117,101,117,101,100,95,111,102,102,115,101,116,161,10,56,54,50,50,54,52,57,51,50,48,163,22,108,97,115,116,95,101,110,113,117,101,117,101,100,95,116,105,109,101,95,117,116,99,131,0,0,1,98,213,174,174,184,163,31,114,117,110,116,105,109,101,95,105,110,102,111,95,114,101,116,114,105,101,118,97,108,95,116,105,109,101,95,117,116,99,131,0,0,1,98,218,46,195,236,0,83,114,193,89,6,163,21,120,45,111,112,116,45,115,101,113,117,101,110,99,101,45,110,117,109,98,101,114,129,0,0,0,0,0,3,63,153,163,12,120,45,111,112,116,45,111,102,102,115,101,116,161,10,56,53,57,48,54,49,51,57,56,52,163,19,120,45,111,112,116,45,101,110,113,117,101,117,101,100,45,116,105,109,101,131,0,0,1,98,110,206,229,157,0,83,115,69,0,83,119,209,0,0,1,15,0,0,0,2,161,4,98,111,100,121,176,0,0,1,0,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90]}}' +82ms
  rhea:io [connection-1] got frame of size 562 +0ms
  rhea:frames [connection-1] RECV: '{"size":562,"type":0,"channel":1,"performative":{"type":"transfer#14","delivery_id":2023,"delivery_tag":{"type":"Buffer","data":[]},"settled":true,"batchable":true},"payload":{"type":"Buffer","data":[0,83,113,193,150,8,163,29,108,97,115,116,95,101,110,113,117,101,117,101,100,95,115,101,113,117,101,110,99,101,95,110,117,109,98,101,114,129,0,0,0,0,0,4,138,110,163,20,108,97,115,116,95,101,110,113,117,101,117,101,100,95,111,102,102,115,101,116,161,10,56,54,50,50,54,52,57,51,50,48,163,22,108,97,115,116,95,101,110,113,117,101,117,101,100,95,116,105,109,101,95,117,116,99,131,0,0,1,98,213,174,174,184,163,31,114,117,110,116,105,109,101,95,105,110,102,111,95,114,101,116,114,105,101,118,97,108,95,116,105,109,101,95,117,116,99,131,0,0,1,98,218,46,195,236,0,83,114,193,89,6,163,21,120,45,111,112,116,45,115,101,113,117,101,110,99,101,45,110,117,109,98,101,114,129,0,0,0,0,0,3,63,154,163,12,120,45,111,112,116,45,111,102,102,115,101,116,161,10,56,53,57,48,54,49,52,51,50,48,163,19,120,45,111,112,116,45,101,110,113,117,101,117,101,100,45,116,105,109,101,131,0,0,1,98,110,206,229,157,0,83,115,69,0,83,119,209,0,0,1,15,0,0,0,2,161,4,98,111,100,121,176,0,0,1,0,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90]}}' +1ms
  rhea:io [connection-1] got frame of size 562 +1ms

. . . . and many more like that...

  rhea:frames [connection-1] RECV: '{"size":562,"type":0,"channel":1,"performative":{"type":"transfer#14","delivery_id":3014,"delivery_tag":{"type":"Buffer","data":[]},"settled":true,"batchable":true},"payload":{"type":"Buffer","data":[0,83,113,193,150,8,163,29,108,97,115,116,95,101,110,113,117,101,117,101,100,95,115,101,113,117,101,110,99,101,95,110,117,109,98,101,114,129,0,0,0,0,0,4,138,110,163,20,108,97,115,116,95,101,110,113,117,101,117,101,100,95,111,102,102,115,101,116,161,10,56,54,50,50,54,52,57,51,50,48,163,22,108,97,115,116,95,101,110,113,117,101,117,101,100,95,116,105,109,101,95,117,116,99,131,0,0,1,98,213,174,174,184,163,31,114,117,110,116,105,109,101,95,105,110,102,111,95,114,101,116,114,105,101,118,97,108,95,116,105,109,101,95,117,116,99,131,0,0,1,98,218,46,205,145,0,83,114,193,89,6,163,21,120,45,111,112,116,45,115,101,113,117,101,110,99,101,45,110,117,109,98,101,114,129,0,0,0,0,0,3,67,121,163,12,120,45,111,112,116,45,111,102,102,115,101,116,161,10,56,53,57,48,57,52,55,50,57,54,163,19,120,45,111,112,116,45,101,110,113,117,101,117,101,100,45,116,105,109,101,131,0,0,1,98,110,206,230,26,0,83,115,69,0,83,119,209,0,0,1,15,0,0,0,2,161,4,98,111,100,121,176,0,0,1,0,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90]}}' +0ms
  rhea:io [connection-1] got frame of size 17 +0ms
>>  rhea:frames [connection-1] RECV: '{"size":17,"type":0,"channel":1,"performative":{"type":"detach#16","closed":true}}' +0ms
>>  rhea:events Link got event: receiver_close +3s
  rhea-promise Resolving the promise as the amqp receiver has been closed. +13s
  azure:event-hubs:receiver Deleted the receiver "5334b21b-b629-4e18-ae52-4c21c6de49e7" from the client cache. +12s
  azure:event-hubs:receiver [connection-1] Receiver "5334b21b-b629-4e18-ae52-4c21c6de49e7" has been closed. +0ms
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Closing client.
  rhea:frames [connection-1] PENDING: '{"channel":0,"type":0,"performative":[]}' +2ms
  rhea:raw [connection-1] SENT: <Buffer 00 00 00 0c 02 00 00 00 00 53 17 45> +3s
  rhea:io [connection-1] read 15 bytes +96ms
  rhea:io [connection-1] got frame of size 15 +0ms
  rhea:frames [connection-1] RECV: '{"size":15,"type":0,"channel":0,"performative":{"type":"end#17"}}' +94ms
  rhea:events Session got event: session_close +96ms
  rhea-promise Resolving the promise as the amqp session has been closed. +96ms
  azure:event-hubs:cbs Successfully closed the cbs session. +14s
  rhea:frames [connection-1] PENDING: '{"channel":0,"type":0,"performative":[]}' +2ms
  rhea:raw [connection-1] SENT: <Buffer 00 00 00 0c 02 00 00 00 00 53 18 45> +96ms
  rhea:io [connection-1] read 15 bytes +96ms
  rhea:io [connection-1] got frame of size 15 +1ms
  rhea:frames [connection-1] RECV: '{"size":15,"type":0,"channel":0,"performative":{"type":"close#18"}}' +95ms
  rhea:events Connection got event: connection_close +96ms
  rhea-promise Resolving the promise as the connection has been successfully closed. +97ms
  azure:event-hubs:client Closed the amqp connection "connection-1" on the client. +0ms
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Closed receiver and client...

For reference these are the options that I provided while creating the receiver:

rhea:events Link got event: receiver_open +639ms
  rhea-promise Resolving the promise with amqp receiver "5334b21b-b629-4e18-ae52-4c21c6de49e7". +638ms
  azure:event-hubs:receiver Promise to create the receiver resolved. Created receiver with name:  5334b21b-b629-4e18-ae52-4c21c6de49e7 +638ms
  azure:event-hubs:receiver [connection-1] Receiver '5334b21b-b629-4e18-ae52-4c21c6de49e7' created with receiver options: { name: '5334b21b-b629-4e18-ae52-4c21c6de49e7',
  azure:event-hubs:receiver   autoaccept: true,
  azure:event-hubs:receiver   source: { address: 'xxxxx/ConsumerGroups/$default/Partitions/0' },
  azure:event-hubs:receiver   credit_window: 0,
  azure:event-hubs:receiver   desired_capabilities: 'com.microsoft:enable-receiver-runtime-metric',
  azure:event-hubs:receiver   target: undefined } +0ms

Error incoming on_transfer

I see this error in the EnMasse subscription service log. Not sure if this is an error on my behalf, but at first sight it looks like an internal library issue.

/node_modules/rhea/lib/session.js:274
data = Buffer.concat([current.data, frame.payload], current.data.size() + f
^
TypeError: undefined is not a function
at Incoming.on_transfer (/node_modules/rhea/lib/session.js:274:78)
at Session.on_transfer (/node_modules/rhea/lib/session.js:586:19)
at Connection.(anonymous function) [as on_transfer] (/node_modules/rhea/lib/connection.js:549:30)
at c.dispatch (/node_modules/rhea/lib/types.js:793:33)
at Transport.read (/node_modules/rhea/lib/transport.js:88:36)
at SaslClient.read (/node_modules/rhea/lib/sasl.js:246:26)
at Connection.input (/node_modules/rhea/lib/connection.js:370:31)
at Socket.emit (events.js:107:17)
at readableAddChunk (_stream_readable.js:163:16)
at Socket.Readable.push (_stream_readable.js:126:10)
at TCP.onread (net.js:540:20)

Accepted event on a link object called how many times...

I'm trying to track down what appears to be a double invocation of the event handler for 'accepted' event on the link object.

I have the code: self._linkObject.on('accepted', self._acceptedHandler.bind(this))

A bit further on in the code I have:

let sendDeliveryObject = this._linkObject.send(message);

It would appear as thought the event handler is invoked before the call to .send returns. It is then invoked AGAIN when the actual message is accepted by the remote peer.

Is this expected behavior?

Properties sometimes not present in sent message

Using Rhea in a browser, I've seen occasions when the supplied "properties" were not present in the sent message. In my case, there were "properties", "application_properties", and a null body. Intermittently, the sent message would only contain the application-properties and the body.

Question: Is it possible to flip flop between directly managing credits and letting rhea manage it for me?

The documentation says:

credit_window - A 'prefetch' window controlling the flow of messages over this receiver. Defaults to 500 if not specified. A value of 0 can be used to turn of automatic flow control and manage it directly.

Side note: Btw, reading the source code it looks like the default value is 1000 and not 500 (doc should be updated).

Further down for receiver the doc says:

add_credit(n)
By default, receivers have a prefetch window that is moved automatically by the library. However if desired the application can set the prefecth to zero and manage credit itself. Each invocation of add_credit() method issues credit for a further 'n' messages to be sent by the peer over this receiving link. [Note: flow()is an alias for add_credit()]

So it seems that,
While creating the receiver, the credit_window can be set to 0 and then we can say receiver.add_credit(n) to receive n messages. However,

  • is it possible for us to create a receiver with default credit_window of 1000 and some time later set the window to 0 to manage it on my own? Will this cause any messages to be dropped?
  • OR vice-versa - Create the receiver with credit_window of 0 and then set the window to 1000, will this indicate rhea to manage credits automatically? Or do we have to make sure to replenish the credits once they are exhausted?

Wrong "dynamic" encoding on source and target

It seems that dynamic property on source and target is wrongly encoded.
From Wireshark I see 0x00 for false and 0x01 for true and not the right AMQP type boolean encoding 0x42 for false and 0x41 for true.

detach without network activity possible...

Utilizing the amqp10 library the links had a method called forceDetach.

We found it useful when doing error recovery to invoke on link. Essentially it caused the local state of the link to go to detached without actually performing a network operation to inform the remote. This is helpful when having detected an actual networking issue, we no longer wish to perform network operations that we know will fail.

Is there anyway that we can mimic this activity with rhea? In cursory looking at detach, it does look like that it will initiate a network send.

Thank you for your time,
Tony

Errors after connection on Firefox 48

I'm seeing errors that seem to only happen on Firefox 48 on Fedora 24.
Chrome on Fedora 24 works. Firefox version 42 on Fedora 23 works.

What I expect to happen when using the attached html page is that when the connection is opened, the red "connecting..." text is replaced by the word "Connected" and then when I click on the button that a schema is returned.

I'm seeing two different behaviors :
Sometimes I get the connection_opened event but then I immediately get:
Error: Attach already received
rhea.js:1095:15
Then when I click the button to request a schema, I get:
TypeError: receiver.remote.attach is undefined
ff48fc24.html:65:17

However, sometimes I'll get a different behavior. I'll immediately get:
Error: Invalid data offset, must be at least 2 was 0
rhea.js:737:15

and then after a few seconds I get:
Error: Close already received
rhea.js:469:15
[connection-1] disconnected
rhea.js:408:13
When that happens, I never get the connection_opened event and the "connecting..." text is never replaced.

Environment:
Fedora 24
Firefox 48
websockify for proxy (but also happens when I use nodejs for proxy)
I'm opening the reproducer file directly in a browser; no web server needed.
ff48fc24.zip

question: How do I emit to a topic using rhea?

The title pretty much says it all. I have been up and down the library but I can't seem to figure out how I do this with rhea. The examples all use 'example' as the address so it makes it difficult to tell if it is just using a queue or a topic.

Any help would be appreciated. 😄

Error: frame sequence error: expected 15, got 56

Getting these random frame sequence errors?

debug: Error: frame sequence error: expected 15, got 56
    at Incoming.on_transfer (D:\home\site\wwwroot\node_modules\rhea\lib\session.js:337:19)
    at Session.on_transfer (D:\home\site\wwwroot\node_modules\rhea\lib\session.js:661:19)
    at Connection.(anonymous function) [as on_transfer] (D:\home\site\wwwroot\node_modules\rhea\lib\connection.js:650:30)
    at c.dispatch (D:\home\site\wwwroot\node_modules\rhea\lib\types.js:904:33)
    at Transport.read (D:\home\site\wwwroot\node_modules\rhea\lib\transport.js:95:36)
    at Connection.input (D:\home\site\wwwroot\node_modules\rhea\lib\connection.js:428:35)
    at emitOne (events.js:116:13)
    at TLSSocket.emit (events.js:211:7)
    at addChunk (_stream_readable.js:263:12)
    at readableAddChunk (_stream_readable.js:250:11)
    at TLSSocket.Readable.push (_stream_readable.js:208:10)
    at TLSWrap.onread (net.js:594:20) 'Error: frame sequence error: expected 15, got 56\n    at Incoming.on_transfer (D:\\home\\site\\wwwroot\\node_modules\\rhea\\lib\\session.js:337:19)\n    at Session.on_transfer (D:\\home\\site\\wwwroot\\node_modules\\rhea\\lib\\session.js:661:19)\n    at Connection.(anonymous function) [as on_transfer] (D:\\home\\site\\wwwroot\\node_modules\\rhea\\lib\\connection.js:650:30)\n    at c.dispatch (D:\\home\\site\\wwwroot\\node_modules\\rhea\\lib\\types.js:904:33)\n    at Transport.read (D:\\home\\site\\wwwroot\\node_modules\\rhea\\lib\\transport.js:95:36)\n    at Connection.input (D:\\home\\site\\wwwroot\\node_modules\\rhea\\lib\\connection.js:428:35)\n    at emitOne (events.js:116:13)\n    at TLSSocket.emit (events.js:211:7)\n    at addChunk (_stream_readable.js:263:12)\n    at readableAddChunk (_stream_readable.js:250:11)\n    at TLSSocket.Readable.push (_stream_readable.js:208:10)\n    at TLSWrap.onread (net.js:594:20)'

Invalid handle null

stack trace

Exception has occurred: Error
Error: Invalid handle null
    at Session._get_link (d:\sdk\rhea\lib\session.js:641:15)
    at Session.on_flow (d:\sdk\rhea\lib\session.js:626:14)
    at Connection.(anonymous function) [as on_flow] (d:\sdk\rhea\lib\connection.js:646:30)
    at c.dispatch (d:\sdk\rhea\lib\types.js:904:33)
    at Transport.read (d:\sdk\rhea\lib\transport.js:95:36)
    at SaslClient.read (d:\sdk\rhea\lib\sasl.js:252:26)
    at Connection.input (d:\sdk\rhea\lib\connection.js:424:35)
    at emitOne (events.js:116:13)
    at TLSSocket.emit (events.js:211:7)
    at addChunk (_stream_readable.js:263:12)

This condition failure causes the exception.

frame.handle is null and the if condition is checking if(frame.handle != undefined)

Here is how the frame object looks like in the debug window

Object {size: 35, type: 0, channel: 0, performative: c}
VM255:1
channel:0
performative:c {value: Array(11)}
available:null
delivery_count:null
drain:null
echo:true
handle:null
incoming_window:5000
link_credit:null
next_incoming_id:1
next_outgoing_id:1
outgoing_window:2048
properties:null
value:Array(11) [Typed, Typed, Typed, …]
length:11
__proto__:Array(0) [, …]
0:Typed {type: TypeDesc, value: 1}
1:Typed {type: TypeDesc, value: 5000}
2:Typed {type: TypeDesc, value: 1}
3:Typed {type: TypeDesc, value: 2048}
4:Typed {type: TypeDesc, value: null}
5:Typed {type: TypeDesc, value: null}
6:Typed {type: TypeDesc, value: null}
7:Typed {type: TypeDesc, value: null}
8:Typed {type: TypeDesc, value: null}
9:Typed {type: TypeDesc, value: true}
10:Typed {type: TypeDesc, value: null}
__proto__:Object {dispatch: , next_incoming_id: <accessor>, incoming_window: <accessor>, …}
size:35
type:0
__proto__:Object {constructor: , __defineGetter__: , __defineSetter__: , …}

and you can rightly see that handle is null

What could be possibly happening?

Why are log messages outbound different than inbound log messages...

Here is a log message when sending a disposition to a peer:

rhea:frames [connection-1] PENDING: '{"channel":0,"type":0,"performative":[true,0,0,true,[]]}' +22ms
rhea:raw [connection-1] SENT: <Buffer 00 00 00 1c 02 00 00 00 00 53 15 d0 00 00 00 0c 00 00 00 05 41 43 43 41 00 53 24 45> +110ms

The frame log really doesn't say that the performative is a dispostion. You can look at the raw message to follow and see the 53 that indicates that the message is a disposition. Additionally it doesn't give the values for role, settled, or state.

However, here is a disposition incoming from a peer:
rhea:frames [connection-1] RECV: '{"size":23,"type":0,"channel":0,"performative":{"type":"disposition#15","role":true,"settled":true,"state":[]}}' +83ms

It does all those things.

Why the difference?

looks like attach_receiver/attach_sender does not pass the options ahead.

We are trying to wrap events inside promises. So far the experience has been good. However at one place I was not able pass the options for setting the receiver.

May be I am doing something wrong over here (should be using some other method). Any help/guidance would be awesome.

Here is the script.

I want to use the createReceiver() like this, where I can pass in the options along with the endpoint.

const rxopt = { name: replyTo, target: { address: replyTo }};
await amqp.createReceiver(session, endpoint, rxopt);

Should this line have arguments instead of args.

circular buffer overflow

Hi, I used your library in a hackathon for using Hono.
I'm using this code: https://github.com/bsinno/bcx2017/tree/master/Hono_Example_Clients/Hono-Node.js
I realized that I get sometimes these errors:

uncaught exception:  Error: circular buffer overflow: head=10 tail=10 size=2048 capacity=2048
    at CircularBuffer.push (/Users/timaschew/dev/iot-hackathon/hono-node-client/node_modules/rhea/lib/session.js:46:15)
    at Incoming.on_transfer (/Users/timaschew/dev/iot-hackathon/hono-node-client/node_modules/rhea/lib/session.js:303:29)
    at Session.on_transfer (/Users/timaschew/dev/iot-hackathon/hono-node-client/node_modules/rhea/lib/session.js:618:19)
    at Connection.(anonymous function) [as on_transfer] (/Users/timaschew/dev/iot-hackathon/hono-node-client/node_modules/rhea/lib/connection.js:642:30)
    at c.dispatch (/Users/timaschew/dev/iot-hackathon/hono-node-client/node_modules/rhea/lib/types.js:902:33)
    at Transport.read (/Users/timaschew/dev/iot-hackathon/hono-node-client/node_modules/rhea/lib/transport.js:95:36)
    at SaslClient.read (/Users/timaschew/dev/iot-hackathon/hono-node-client/node_modules/rhea/lib/sasl.js:243:26)
    at Connection.input (/Users/timaschew/dev/iot-hackathon/hono-node-client/node_modules/rhea/lib/connection.js:420:35)
    at emitOne (events.js:96:13)
    at Socket.emit (events.js:191:7)

sasl mechanisms?

Hi

taking a look at the sasl-specific portion of the code i'm trying to understand what a client needing a custom sasl mechanism could do.

it looks like the sasl_mechanism interface is synchronous - meaning the init() and next() APIs have to return results and cannot use callbacks/promises, is that correct? in the case where the sasl mechanism relies on external async APIs (for example using a hardware TPM to sign a SASL challenge key with a secret stored in the hardware), how would that work?

also it looks like the PlainClient and AnonymousClient aren't exported? in the case where we want to build a client that supports multiple SASL mechanisms, do we have to rewrite them? or are the sasl_mechanisms collections (the default one and the user-provided one) merged?

Buffer out of range exception

Hi, I get an exception when I run the following code:

`const container = require('rhea');

container.on('connection_open', function (context) {
context.connection.open_receiver('TEST');

});

container.on('message', function (context) {
console.log(context.message.body);
});

container.connect({'port':5672});`

The exception message conveys that the buffer is out of range. The exception error is the following:
`buffer.js:821
throw new RangeError('Index out of range');
^

RangeError: Index out of range
at checkOffset (buffer.js:821:11)
at Buffer.readUInt32BE (buffer.js:895:5)
at Transport.read (/home/vlima/Documents/development/nodejs/node-stomp-ws/node_
modules/rhea/lib/transport.js:74:33)
at Connection.input (/home/vlima/Documents/development/nodejs/node-stomp-ws/nod
e_modules/rhea/lib/connection.js:370:31)
at emitOne (events.js:96:13)
at Socket.emit (events.js:188:7)
at readableAddChunk (_stream_readable.js:176:18)
at Socket.Readable.push (_stream_readable.js:134:10)
at TCP.onread (net.js:548:20)`

serializing a property as a symbol as opposed to a string what decides?

I was having a problem with a particular operation I was sending over rhea.

I was sending a message_annotation: It was created thus:

The value of method is: 'GET'

amqpMessage.message_annotations = {
  operation: method
};

This is serialized out by rhea as:
a1096f7065726174696f6ea103474554

The property using amqp10 serializes:

a3096f7065726174696f6ea103474554

Note that it appears that for amqp10 the operation property name is defined as a symbol whereas for rhea it is defined as a string.

Any thoughts on why this might have serialized differently?

did not recognise message section with descriptor 118

When encoding an AmqpSequence with proton-j and decoding it in rhea, the following log message is displayed:

WARNING: did not recognise message section with descriptor 118

Attached is a junit test that runs a proton-j server passing the desired value to a client opening a receiver. Requires vertx-proton to build.

unittest.zip

Where should properties like message_id, correlation_id, etc. be set in an amqp message?

3.2.4    Properties
Immutable properties of the message.
<type name="properties" class="composite" source="list" provides="section">
<descriptor name="amqp:properties:list" code="0x00000000:0x00000073"/>
<field name="message-id" type="*" requires="message-id"/>
<field name="user-id" type="binary"/>
<field name="to" type="*" requires="address"/>
<field name="subject" type="string"/>
<field name="reply-to" type="*" requires="address"/>
<field name="correlation-id" type="*" requires="message-id"/>
<field name="content-type" type="symbol"/>
<field name="content-encoding" type="symbol"/>
<field name="absolute-expiry-time" type="timestamp"/>
<field name="creation-time" type="timestamp"/>
<field name="group-id" type="string"/>
<field name="group-sequence" type="sequence-no"/>
<field name="reply-to-group-id" type="string"/>
</type>
The properties section is used for a defined set of standard properties of the message. The properties section is
part of the bare message; therefore, if retransmitted by an intermediary, it MUST remain unaltered.

As per the AMQP 1.0 specification, I thought that the predefined message properties should be sent inside a property named properties as follows in an amqp message object:

const request: any = {
         body: tokenObject.token,
        properties: { //properties object..
           message_id: uuid(),
           reply_to: this.replyTo,
           to: this.endpoint,
        },
        application_properties: {
          operation: Constants.operationPutToken,
          name: audience,
          type: tokenObject.tokenType
        }

Usually the service would send back a message which has correlation_id in it. However that did not happen.
Instead when I sent those predefined properties as top level properties as described below then the service did send me a message with correlation_id in it.

const request: any = {
        body: tokenObject.token,
        message_id: uuid(), // Top level property
        reply_to: this.replyTo, // Top level property
        to: this.endpoint, // Top level property
        application_properties: {
          operation: Constants.operationPutToken,
          name: audience,
          type: tokenObject.tokenType
        }
      };

Thus it seems to me like all the predefined properties need to be sent as top level properties.

I am not sure if the service is doing something wrong over here. However, wanted to know if rhea was expecting all the predefined properties as top level properties inside the AMQP message.

Missing options.js file

$ node simple_send.js --help
module.js:471
throw err;
^

Error: Cannot find module './options.js'
at Function.Module._resolveFilename (module.js:469:15)
at Function.Module._load (module.js:417:25)
at Module.require (module.js:497:17)
at require (internal/module.js:20:19)
at Object. (/home/dkornel/redhat/repos/rhea/examples/simple_send.js:22:20)
at Module._compile (module.js:570:32)
at Object.Module._extensions..js (module.js:579:10)
at Module.load (module.js:487:32)
at tryModuleLoad (module.js:446:12)
at Function.Module._load (module.js:438:3)

link fields source as nested object test case fails

Description:
While running the TCs in Jenkins, the "link fields source as nested object" test case fails.

Steps to reproduce:

  1. mocha -g nested $rhea_inst_root/test/links.js

Actual results:
[connection-3] error: Error: connect ECONNRESET
[connection-3] disconnected
Scheduled reconnect in 100ms
․․

3 passing (37ms)
1 failing

  1. link fields source as nested object:
    Uncaught TypeError: value is out of bounds
    at TypeError ()
    at checkInt (buffer.js:705:11)
    at Buffer.writeUInt32BE (buffer.js:748:5)
    at Function.write_ulong as write
    at types.Writer.write_fixed_width (/home/jprajzne/nodejs/node_modules/rhea/lib/types.js:584:14)
    at types.Writer.write_value (/home/jprajzne/nodejs/node_modules/rhea/lib/types.js:626:14)
    at types.Writer.write (/home/jprajzne/nodejs/node_modules/rhea/lib/types.js:618:10)
    at types.Writer.write_constructor (/home/jprajzne/nodejs/node_modules/rhea/lib/types.js:608:14)
    at types.Writer.write (/home/jprajzne/nodejs/node_modules/rhea/lib/types.js:617:10)
    at types.Writer.write_compound (/home/jprajzne/nodejs/node_modules/rhea/lib/types.js:653:18)
    at types.Writer.write_value (/home/jprajzne/nodejs/node_modules/rhea/lib/types.js:630:14)
    at types.Writer.write (/home/jprajzne/nodejs/node_modules/rhea/lib/types.js:618:10)
    at types.Writer.write_compound (/home/jprajzne/nodejs/node_modules/rhea/lib/types.js:653:18)
    at types.Writer.write_value (/home/jprajzne/nodejs/node_modules/rhea/lib/types.js:630:14)
    at types.Writer.write (/home/jprajzne/nodejs/node_modules/rhea/lib/types.js:618:10)
    at types.Writer.write_compound (/home/jprajzne/nodejs/node_modules/rhea/lib/types.js:653:18)
    at types.Writer.write_value (/home/jprajzne/nodejs/node_modules/rhea/lib/types.js:630:14)
    at types.Writer.write (/home/jprajzne/nodejs/node_modules/rhea/lib/types.js:618:10)
    at Object.frames.write_frame (/home/jprajzne/nodejs/node_modules/rhea/lib/frames.js:105:16)
    at Transport.encode (/home/jprajzne/nodejs/node_modules/rhea/lib/transport.js:43:25)
    at Connection._write_frame (/home/jprajzne/nodejs/node_modules/rhea/lib/connection.js:438:25)
    at Session.output (/home/jprajzne/nodejs/node_modules/rhea/lib/session.js:373:21)
    at link._process (/home/jprajzne/nodejs/node_modules/rhea/lib/link.js:89:26)
    at Session._process (/home/jprajzne/nodejs/node_modules/rhea/lib/session.js:459:27)
    at Connection._process (/home/jprajzne/nodejs/node_modules/rhea/lib/connection.js:429:39)
    at process._tickCallback (node.js:442:13)

Expected results:
The TC passes.

Note:
This TC fails with and without JBoss A-MQ 6.2.0 GA running with AMQP transport.

Encoding a message_id as a uuid type.

Utilizing amqp10 we can impose a type on our uuid properties message_id and correlation_id.

We assign the property in the message object before sending it to amqp like so:

if (typeof uuidString === 'string' && uuidString.match(uuidRegEx)) {
uuid = (amqp10).Type.uuid(uuidString);
}

When amqp10 serializes the data it will utilize the amqp uuid type.

Any thoughts on how this can be (or could be) accomplished with rhea?

Add guidance on debugging

Users will likely end up trying DEBUG=rhea and failing, because that's what the primary docs on the web suggest. I did this, and it produces no output. I recommend mentioning DEBUG=* and DEBUG=rhea:$aspect.

A-MQ 6.2.x authentication works while providing wrong credentials

Description:
the client connects to the server even while the credentials are incorrect

Steps to reproduce:

  1. start the server with amqp connector and enabled authentication
  2. provide wrong credentials: node rhea/examples/sasl/simple_sasl_client.js

Expected results:
The authentication fails

Actual results:
The connection is established without any authentication failed message:
2015-11-02 16:29:07,808 | WARN | -1, empty queue] | Transport | ivemq.broker.TransportConnection 245 | 171 - org.apache.activemq.activemq-osgi - 5.11.0.redhat-620133 | Transport Connection to: tcp://127.0.0.1:43041 failed: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too (>30000) long: tcp://127.0.0.1:43041

Note:
Please notice that the client is not connecting to a destination nor sending messages.

Extend the test suite

I've run "istanbul cover rhea/lib/*js" to get the test coverage of rhea:
=============================== Coverage summary ===============================
Statements : 29.79% ( 558/1873 )
Branches : 3.67% ( 27/736 )
Functions : 3.72% ( 12/323 )

Lines : 30.59% ( 558/1824 )

I think it'd be great to increase the coverage.

Detect when reconnect_limit is reached

This is not a bug report. This is a request for support. I need to be able to detect when reconnect_limit has been reached so that I can send an email out to alert people that the link to the server has been broken. But I simply have not been able to figure out how to detect this. I've been looking in the context for the disconnected event, but nothing stands out to me. How would I detect this? Thanks.

direct_recv doesn't receive after simple_recv connects and disconnects

Description:
direct_recv doesn't receive after simple_recv connects and disconnects.

How reproducible:
Always

Steps to Reproduce:

  1. node $inst_root/rhea/examples/direct_recv.js -p 61616 in 1st terminal
  2. node $inst_root/rhea/examples/simple_recv.js -p 61616 in 2nd terminal
  3. kill the simple_recv in 2nd terminal
  4. node $inst_root/rhea/examples/simple_send.js -p 61616

Actual results:
Following error is thrown:
[connection-1] disconnected

[connection-2] error: Error: write after end
[connection-2] disconnected

Expected results:
The direct_recv receives incoming message(s)

Don't log to stdout from library code

My use of rhea depends on uniform application output to stdout, and things such as connection errors on stdout pose a problem.

[jross@localhost quiver (master)]$ fgrep -r "console.log" javascript/node_modules/rhea/lib
javascript/node_modules/rhea/lib/terminus.js: console.log('Unknown terminus: ' + field.descriptor);
javascript/node_modules/rhea/lib/ws.js: console.log('ERROR: Attempt to set unrecognised handler on websocket wrapper: ' + event);
javascript/node_modules/rhea/lib/connection.js: console.log('[' + this.options.id + '] error: ' + e);
javascript/node_modules/rhea/lib/connection.js: console.log('[' + this.options.id + '] disconnected');
javascript/node_modules/rhea/lib/session.js: console.log('ERROR: Next pending delivery not found: ' + this.next_pending_delivery);
javascript/node_modules/rhea/lib/rpc.js: console.log('no request pending for ' + id + ', ignoring response');
javascript/node_modules/rhea/lib/link.js: if (frame.performative.link_credit > 0) console.log('ERROR: received flow with drain set, but non zero credit');
javascript/node_modules/rhea/lib/message.js: console.log("WARNING: did not recognise message section with descriptor " + s.descriptor);
javascript/node_modules/rhea/lib/message.js: console.log("WARNING: expected described message section got " + JSON.stringify(s));
javascript/node_modules/rhea/lib/message.js: console.log('unrecognised outcome: ' + JSON.stringify(outcome));

is this library amqp1.0 compliant?

Hello,

I was looking at the node-amqp10 library and this one. I wanted to know

  • if rhea is AMQP 1.0 compliant?
  • how is this different from node-amqp10?
  • Can this be used with Azure ServiceBus? Are there any examples?

ERROR TypeError: get_connect_fn(...) is not a function

Hi, I'm trying to use the rhea library with angular 2 + webpack.
I get an exception when I run the following code:

import * as _ from 'rhea';
_.on('connection_open', function (context) {
context.connection.open_receiver('examples');
context.connection.open_sender('examples');
});
_.on('message', function (context) {
console.log(context.message.body);
context.connection.close();
});
_.once('sendable', function (context) {
context.sender.send({ body: payload });
});
_.connect({'host': 'localhost', 'port': 61616, 'user' : 'admin','password': 'admin', 'reconnect': true });

vendor.bundle.js:83229 Angular is running in the development mode. Call enableProdMode() to enable the production mode.
main.bundle.js:397 o payload é undefined
ng:///ClientsMessageModule/ProducerComponent.ngfactory.js:220 ERROR TypeError: get_connect_fn(...) is not a function
at Connection.webpackJsonp.../../../../rhea/lib/connection.js.Connection.connect (vendor.bundle.js:25781)
at Connection.webpackJsonp.../../../../rhea/lib/connection.js.Connection.connect (vendor.bundle.js:25763)
at Container.webpackJsonp.../../../../rhea/lib/container.js.Container.connect (vendor.bundle.js:26262)
at AmqpService.webpackJsonp.../../../../../src/app/clients-message/services/amqp.service.ts.AmqpService.put (main.bundle.js:504)
at ProducerService.webpackJsonp.../../../../../src/app/clients-message/producer/producer.service.ts.ProducerService.send (main.bundle.js:456)
at ProducerComponent.webpackJsonp.../../../../../src/app/clients-message/producer/producer.component.ts.ProducerComponent.OnClick (main.bundle.js:398)
at Object.eval [as handleEvent] (ng:///ClientsMessageModule/ProducerComponent.ngfactory.js:226)
at handleEvent (vendor.bundle.js:92351)
at callWithDebugContext (vendor.bundle.js:93810)
at Object.debugHandleEvent [as handleEvent] (vendor.bundle.js:93398)
View_ProducerComponent_0 @ ng:///ClientsMessageModule/ProducerComponent.ngfactory.js:220
proxyClass @ vendor.bundle.js:67538
webpackJsonp.../../../core/@angular/core.es5.js.DebugContext
.logError @ vendor.bundle.js:93750
webpackJsonp.../../../core/@angular/core.es5.js.ErrorHandler.handleError @ vendor.bundle.js:81384
(anonymous) @ vendor.bundle.js:89578
(anonymous) @ vendor.bundle.js:104101
webpackJsonp.../../../../zone.js/dist/zone.js.ZoneDelegate.invokeTask @ polyfills.bundle.js:2838
onInvokeTask @ vendor.bundle.js:84228
webpackJsonp.../../../../zone.js/dist/zone.js.ZoneDelegate.invokeTask @ polyfills.bundle.js:2837
webpackJsonp.../../../../zone.js/dist/zone.js.Zone.runTask @ polyfills.bundle.js:2605
ZoneTask.invoke @ polyfills.bundle.js:2900
ng:///ClientsMessageModule/ProducerComponent.ngfactory.js:220 ERROR CONTEXT DebugContext_
View_ProducerComponent_0 @ ng:///ClientsMessageModule/ProducerComponent.ngfactory.js:220
proxyClass @ vendor.bundle.js:67538
webpackJsonp.../../../core/@angular/core.es5.js.DebugContext_.logError @ vendor.bundle.js:93750
webpackJsonp.../../../core/@angular/core.es5.js.ErrorHandler.handleError @ vendor.bundle.js:81389
(anonymous) @ vendor.bundle.js:89578
(anonymous) @ vendor.bundle.js:104101
webpackJsonp.../../../../zone.js/dist/zone.js.ZoneDelegate.invokeTask @ polyfills.bundle.js:2838
onInvokeTask @ vendor.bundle.js:84228
webpackJsonp.../../../../zone.js/dist/zone.js.ZoneDelegate.invokeTask @ polyfills.bundle.js:2837
webpackJsonp.../../../../zone.js/dist/zone.js.Zone.runTask @ polyfills.bundle.js:2605
ZoneTask.invoke @ polyfills.bundle.js:2900

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.