Git Product home page Git Product logo

servicebus's Introduction

Build Status

servicebus

Simple service bus for sending events between processes using amqp. Allows for send/receive and publish/subscribe pattern messaging over RabbitMQ.

Configuration

Sending and Receiving

Servicebus allows simple sending and recieving of messages in a 1:1 sender:listener configuration. The following two processes will send an event message called 'my.event' every second from process A to process B via RabbitMQ and print out the sent event:

Process A:

var bus = require('servicebus').bus();
bus.listen('my.event', function (event) {
  console.log(event);
});

Process B:

var bus = require('servicebus').bus();

setInterval(function () {
  bus.send('my.event', { my: 'event' });
}, 1000);

Round-Robin Load Distribution

Simply running multiple versions of Process A, above, will cause servicebus to distribute sent messages evenly accross the list of listeners, in a round-robin pattern.

Message Acknowledgement

(Note: message acking requires use of the https://github.com/mateodelnorte/servicebus-retry middleware)

Servicebus integrates with RabbitMQ's message acknowledement functionality, which causes messages to queue instead of sending until the listening processes marks any previously received message as acknowledged or rejected. Messages can be acknowledged or rejected with the following syntax. To use ack and reject, it must be specified when defining the listening function:

bus.listen('my.event', { ack: true }, function (event) {
  event.handle.acknowledge(); // acknowledge a message
  event.handle.ack(); // short hand is also available
  event.handle.reject(); // reject a message
});

Message acknowledgement is suited for use in load distribution scenarios.

Authentication (RabbitMQ Bus)

Fully qualified url

You may authenticate by providing url as an option when initializing the bus, or setting RABBITMQ_URL as an environment variable. RabbitMQ uses basic auth url format for authentication.

var bus = servicebus.bus({
  url: "amqp://user:pass@localhost:5672,
})

config options

Alternatively, you may provide a user, password, host (optional, default = 'localhost'), and port (optional, default = 5672), and servicebus will construct the url before passing it to RabbitMQ.

var bus = servicebus.bus({
  user: 'rabbitUser',
  password: 'test1234',
  host: '1.1.1.1'
  port: '5555'
})

NOTE: If url and user/password are provided, the url will be used.

Publish / Subscribe

Servicebus can also send messages from 1:N processes in a fan-out architecture. In this pattern, one sender publishes a message and any number of subscribers can receive. The pattern for usage looks very similar to send/listen:

Process A (can be run any number of times, all will receive the event):

var bus = require('servicebus').bus();
bus.subscribe('my.event', function (event) {
  console.log(event);
});

Process B:

var bus = require('servicebus').bus();

setInterval(function () {
  bus.publish('my.event', { my: 'event' });
}, 1000);

Topic Routing

To use topic routing to accept multiple events in a single handler, use publish and subscribe and the following syntax:

bus.publish('event.one', { event: 'one' });
bus.publish('event.two', { event: 'two' });

and for the listener...

bus.subscribe('event.*', function (msg) ...

Middleware

Servicebus allows for middleware packages to enact behavior at the time a message is sent or received. They are very similar to connect middleware in their usage:

  if ( ! process.env.RABBITMQ_URL)
    throw new Error('Tests require a RABBITMQ_URL environment variable to be set, pointing to the RabbiqMQ instance you wish to use.');

  var busUrl = process.env.RABBITMQ_URL

  var bus = require('../').bus({ url: busUrl });

  bus.use(bus.package());
  bus.use(bus.correlate());
  bus.use(bus.logger());

  module.exports.bus = bus;

Middleware may define one or two functions to modify incoming or outgoing messages:

...

  function logIncoming (queueName, message, options, next) {
    log('received ' + util.inspect(message));
    next(null, queueName, message, options);
  }

  function logOutgoing (queueName, message, options, next) {    
    log('sending ' + util.inspect(message));
    next(null, queueName, message, options);
  }

  return {
    handleIncoming: logIncoming,
    handleOutgoing: logOutgoing
  };

handleIncoming pipelines behavior to be enacted on an incoming message. handleOutgoing pipelines behavior to be enacted on an outgoing message. To say that the behavior is pipelined is to say that each middleware is called in succession, allowing each to enact its behavior before the next. (in from protocol->servicebus->middleware 1->middleware 2->servicebus->user code)

Included Middleware

Correlate

Correlate simply adds a .cid (Correlation Identity) property to any outgoing message that doesn't already have one. This is useful for following messages in logs across services.

Logger

Logger ensures that incoming and outgoing messages are logged to stdout via the debug module. (Use this in non-high throughput scenarios, otherwise you'll have some very quickly growing logs)

Package

Package repackages outgoing messages, encapsulating the original message as a .data property and adding additional properties for information like message type and datetime sent:

  // bus.publish('my:event', { my: 'event' });
  {
    my: 'event'
  };

becomes

  {
    data: {
      my: 'event'
    }
    , datetime: 'Wed, 04 Sep 2013 19:31:11 GMT'
    , type: 'my:event'
  };

Retry

https://github.com/mateodelnorte/servicebus-retry

Retry provides ability to specify a max number of times an erroring message will be retried before being placed on an error queue. The retry middleware requires the correlate middleware.

Contributing

servicebus uses semantic-release for deploys.

Commits must follow Conventional Changelog to accurately calculate new versions.

servicebus's People

Contributors

alexbeletsky avatar allenhartwig avatar bloo avatar brainflake avatar chrisabrams avatar golovan avatar gonzohunter avatar greenkeeper[bot] avatar madnight avatar mateodelnorte avatar mdimitrov avatar mettjus avatar patrickleet avatar renovate-bot avatar repkins avatar ryanvm avatar seanxlliu avatar sinetbot avatar stefankutko avatar thibaultlaurens avatar timisbusy avatar tkers avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

servicebus's Issues

document overriding options.correlator

allows for overriding default behavior of subscriptions where, when a queueName is not manually provided, servicebus will auto-create a unique queue name for a fanout subscription. by overriding the correlator, one can easily provide their own with simpler conventions that, for instance, don't auto-generate unique queue names:

servicebus.bus({
  correlator: {
    queueName: function (opts, cb) {
      return cb(null, `my-process-name-${opts.queueName}`);
    }
  }
});

In time, a simpler correlator will become the default and a number of options will be included with servicebus.

front end in mind

I'm on to the front end part of my poc for moving our arch to a service bus. You touched on UI interaction mongo oplog and the such, but I really don't understand how I'm going to handle our typical use cases like; form submissions that require a response before moving on to the next question (what if a service is down and the retry just sits on the queue, etc..), authentication where we don't let a portal user to send commands to the bus (via web app) until they receive a token, and all the other common scenarios with front end interaction.

Do you have any examples and or insight on this? I'd love to chat with you about it more. ty (at the moment I'm exploring rethinkdb for the denormalizer)

Pub/sub not working with multiple processes.

I maybe missing something here I can't figure out myself.

Basically, I'm running multiple processes of User clients each subscribing to a specific event users.*

function User () {
    if (!(this instanceof User)) {
        return new User();
    }
    this.requests = [];

    // Subscribers
    bus.subscribe('users.*', {ack : true}, this.handleRequest.bind(this));
}

User.prototype.createUser = function (request, response, next) {
    var cid = uuid.v4();
    this.requests[cid] = {'response' : response, 'cb' : next};

    bus.send('users.create', request.body, {correlationId : cid})
};

And multiple processes of user service workers.

bus.listen('users.create', {ack: true}, function (msg) {
    var data = msg.data;

    console.log(cli.green('[users]'), msg.type, data);

    // do some work
    setTimeout(function () {
        bus.publish('users.created', data, {correlationId : msg.correlationId});
        msg.handle.ack();
    }, 500);
});

Everytime a request is sent from User.createUser it's being distributed (in round-robin) to service workers, no problem. When a service worker publishes users.created event only one User client receives the event even though each User client process subscribes to the same event.

Note:
All of them share the same bus.js.

TypeError: Object #<RabbitMQBus> has no method '_publish'

On message publish I receive the follwing error

TypeError: Object # has no method '_publish'
at republish (/Users/.../node_modules/servicebus/bus/rabbitmq/bus.js:137:12)
at process._tickCallback (node.js:415:13)

my code:
bus = require('servicebus').bus()

bus.use(bus.package())

bus.use(bus.correlate())

bus.use(bus.log())

bus.use(bus.retry())

bus.subscribe('my.event.abc', (event) ->
console.log(event)
)
setTimeout( ->
bus.publish('my.event.abc', { my: 'event' })
, 100)

use of enableConfirms option

Hi

I am trying to use enableConfirms options with callback on send and I am getting and error that options is not set so I can't use the callback

//...
var bus = require('servicebus').bus({ url: busUrl, enableConfirms: true });

bus.send('test', { xxx: 'yyy'}, function (err) {
  console.log(err);
});

output
[Error: callbacks only supported when created with bus({ enableConfirms:true })]

am I missing something ?

How to handle malformed JSON on message payload?

I was running some tests sending the messages directly from RabbitMQ Manager. Then I sent a malformed JSON and my application stopped with the following exception:

events.js:160
      throw er; // Unhandled 'error' event
      ^

 in JSON at position 31 token
    at Object.parse (native)
    at Object.deserialize (/opt/mensmarket/mm-catalog-service/node_modules/servicebus/bus/formatters/json.js:4:20)
    at listenChannel.consume.noAck (/opt/mensmarket/mm-catalog-service/node_modules/servicebus/bus/rabbitmq/queue.js:93:41)
    at Channel.BaseChannel.dispatchMessage (/opt/mensmarket/mm-catalog-service/node_modules/amqplib/lib/channel.js:466:12)
    at Channel.BaseChannel.handleDelivery (/opt/mensmarket/mm-catalog-service/node_modules/amqplib/lib/channel.js:475:15)
    at emitOne (events.js:96:13)
    at Channel.emit (events.js:188:7)
    at /opt/mensmarket/mm-catalog-service/node_modules/amqplib/lib/channel.js:263:10
    at Channel.content [as handleMessage] (/opt/mensmarket/mm-catalog-service/node_modules/amqplib/lib/channel.js:316:9)
    at Channel.C.acceptMessageFrame (/opt/mensmarket/mm-catalog-service/node_modules/amqplib/lib/channel.js:231:31)

I thought that this kind of issue should be solved by the servicebus-retry package. So after N attempts to process the message, I thought that the message could be put on the '.error' queue. But I was wrong and the application just stopped.

Am I doing something wrong on how to handle this situation?

Subscribe broken for clustered applications

When running a clustered app (under PM2 for example) the clustered processes all end up subscribing to the same queue. Therefore only one subscribing processes will receive the message.

This issue appears to have been caused by the introduction of the '.queue' file for loading the queue list. Since all process load the same .queue file, they all simply subscribe to the same queue.

Error: COMMAND_INVALID - second 'channel.open' seen

What does that mean? I'm trying to quickly test listening to different events and randomly this occurs.

If I start and stop this quickly I randomly ( 1 out of 10 times ) get that error:

var bus  = require('servicebus').bus(),
util = require('util')

bus.listen('foo', function(obj) {
  return util.log("obj");
});

bus.send('foo', {
  bar: 'zoo'
});

bus.listen('fooer', function(obj) {
  return util.log("obj 2");
});

bus.send('fooer', {
  bar: 'zoo'
});

Concerns about growth of denormalizer

I'm a little concerned about the denormalizer growing out of control and about the amount of domain logic the could potentially end up there while aggregating. As we decompose our some of our monoliths we will end up with a large amount of domain specific services that will eventually end up being aggregated in some way or another into the denormalizer service/db.

I think you mentioned that you've dealt with a large number of services and a single denormalizer before. Does it become hard to conceptualize over time, and how have you composed this service to help relieve that stress?

Isn't a "saga" a term used in CQRS as a way to have many denormalizers?

set autoDelete to ‘false‘ doesn't work

autoDelete: options.autoDelete || ! (options.ack || options.acknowledge),

In the following code, if autoDelete is false, the option is simply overwritten by options.ack.
autoDelete: options.autoDelete || ! (options.ack || options.acknowledge)

expected behavior:

if autoDelete is set to false, ack won't affect autoDelete.
options.autoDelete === undefined ? ! (options.ack || options.acknowledge) : options.autoDelete

pub/sub doesnt work

Dear Matt,

i am using servicebus pub-sub without success.
publishing one event to two subscribers sends the event two the first, or to the second subscriber. Never to both subscribers.

I did quite a lot of research to handle the problem but cant find a solution.
i use the pub - sub example from npm but with two subs -- it doesnt work.

Trying to use amqp with Solace hardware router (on new branch)

I'm trying to get amqp working with a Solace hardware router (http://solacesystems.com/products/unified-api-and-open-source/), which implements amqp. It appears the Solace appliance is returning a frame size larger than the frame buffer in the amqp package allows. It may also be that the amqp package implements amqp protocol version 0-9-1 but the appliance does not support this version.

The following logs are from service bus and the amqp package with full logging on:

$ NODE_DEBUG_AMQP=1 node test.js                                                                                                                     
initializing amqp methods...                                                                                                                         

connectionStart                                                                                                                                      

connectionStartOk                                                                                                                                    

connectionSecure                                                                                                                                     

connectionSecureOk                                                                                                                                   

connectionTune                                                                                                                                       

connectionTuneOk                                                                                                                                     

connectionOpen                                                                                                                                       

connectionOpenOk                                                                                                                                     

connectionClose                                                                                                                                      

connectionCloseOk                                                                                                                                    

channelOpen                                                                                                                                          

channelOpenOk                                                                                                                                        

channelFlow                                                                                                                                          

channelFlowOk                                                                                                                                        

channelClose                                                                                                                                         

channelCloseOk                                                                                                                                       

exchangeDeclare                                                                                                                                      

exchangeDeclareOk                                                                                                                                    

exchangeDelete                                                                                                                                       

exchangeDeleteOk                                                                                                                                     

queueDeclare                                                                                                                                         

queueDeclareOk                                                                                                                                       

queueBind                                                                                                                                            

queueBindOk                                                                                                                                          

queueUnbind                                                                                                                                          

queueUnbindOk                                                                                                                                        

queuePurge                                                                                                                                           

queuePurgeOk                                                                                                                                         

queueDelete                                                                                                                                          

queueDeleteOk                                                                                                                                        

basicQos                                                                                                                                             

basicQosOk                                                                                                                                           

basicConsume                                                                                                                                         

basicConsumeOk                                                                                                                                       

basicCancel                                                                                                                                          

basicCancelOk                                                                                                                                        

basicPublish                                                                                                                                         

basicReturn                                                                                                                                          

basicDeliver                                                                                                                                         

basicGet                                                                                                                                             

basicGetOk                                                                                                                                           

basicGetEmpty                                                                                                                                        

basicAck                                                                                                                                             

basicReject                                                                                                                                          

basicRecoverAsync                                                                                                                                    

basicRecover                                                                                                                                         

basicRecoverOk                                                                                                                                       

txSelect                                                                                                                                             

txSelectOk                                                                                                                                           

txCommit                                                                                                                                             

txCommitOk                                                                                                                                           

txRollback                                                                                                                                           

txRollbackOk                                                                                                                                         

  servicebus connecting to rabbitmq on smf://nj1dsolvip01:55555 +0ms                                                                                 
  servicebus listen on queue test +7ms                                                                                                               
connect: {}                                                                                                                                          

AMQPParser: '0-9-1' 'client'                                                                                                                         

connected...                                                                                                                                         

  servicebus timout triggered +3s                                                                                                                    
  servicebus sending to queue test event { cid: '5e389304-7af7-424a-99cc-27d15b4ff42c',                                                              
  data: { test: 'test 1123123123123123' },                                                                                                           
  datetime: 'Tue, 02 Jul 2013 21:54:37 GMT',                                                                                                         
  type: 'test' } +4ms                                                                                                                                
1 < channelOpen {"reserved1":""}                                                                                                                     

domain: shortstr param:                                                                                                                              

sending frame: ☺ ☺   ♣ ¶                                                                                                                             
 �                                                                                                                                                   

data: { '0': <Buffer 03 8d 70 01 00 00 00 2e 00 00 00 2e 08 22 00 00 01 90 55 6e 73 75 70 70 6f 72 74 65 64 20 50 72 6f 74 6f 63 6f 6c 20 56 65 72 73
69 6f 6e> }                                                                                                                                          

execute: ♥�p☺   .   "  ☺�Unsupported Protocol Version                                                                                                

<Buffer 03 8d 70 01 00 00 00 2e 00 00 00 2e 08 22 00 00 01 90 55 6e 73 75 70 70 6f 72 74 65 64 20 50 72 6f 74 6f 63 6f 6c 20 56 65 72 73 69 6f 6e>   
  servicebus { '0': 'Oversized frame 16777216' } +10ms                                                                                               
  servicebus Error connecting to rabbitmq at smf://nj1dsolvip01:55555 error: Oversized frame 16777216 +1ms                                           


c:\Users\mwalters\development\scratch\servicebus\bus\rabbitmq\bus.js:28    
    throw err;                                                             
          ^                                                                
Oversized frame 16777216                                                   

Refactor retry() middleware to accept message store provider

Refactor the retry() middleware to have a message store provider pattern. The pattern will allow you to have a MemoryStore which will track retry counts in memory, or for instance a RedisStore which will track message cids and their associated retry counts in redis.

This will make the api difference between "local" and "distributed" more obvious, simply replacing them with the use of the store. A distributed model will no longer depend on the need to modify message properties (RabbitMQ doesn't allow us to do this on a reject(requeue=true), which is currently forcing us to resend messages on the back of a queue when rejecting. We would prefer to simply reject(requeue=true).

move middleware to their own projects

I'd be happy to start moving the remaining built-in middleware functions to their own projects and add links from readme since some of these are not mentioned. Let me know what you think.

correlate
json
logger
messageDomain
package

Publish/subscribe with many subscribers fails?

From documentation I understood that if we have a publish/subscribe bus, then for one/same event published all subscribers should receive it. If that's hold true, then the simple example presented in publish/subscribe section does not work properly.

Potentially unhandled rejection [1] TypeError: Cannot set property 'queueName' of undefined

In a service using servicebus (tried 1.0.21, 1.0.19 and 1.0.18) I'm receiving this console output and I'm not sure why:

Potentially unhandled rejection [1] TypeError: Cannot set property 'queueName' of undefined
    at RabbitMQBus.setOptions (/app/node_modules/servicebus/bus/rabbitmq/bus.js:161:23)
    at RabbitMQBus.subscribe (/app/node_modules/servicebus/bus/rabbitmq/bus.js:221:8)
    at emitNone (events.js:91:20)
    at RabbitMQBus.emit (events.js:185:7)
    at done (/app/node_modules/servicebus/bus/rabbitmq/bus.js:56:14)
    at /app/node_modules/servicebus/bus/rabbitmq/bus.js:77:7
    at tryCatchReject (/app/node_modules/when/lib/makePromise.js:840:30)
    at runContinuation1 (/app/node_modules/when/lib/makePromise.js:799:4)
    at Fulfilled.when (/app/node_modules/when/lib/makePromise.js:590:4)
    at Pending.run (/app/node_modules/when/lib/makePromise.js:481:13)
    at Scheduler._drain (/app/node_modules/when/lib/Scheduler.js:62:19)
    at Scheduler.drain (/app/node_modules/when/lib/Scheduler.js:27:9)
    at _combinedTickCallback (internal/process/next_tick.js:67:7)
    at process._tickDomainCallback (internal/process/next_tick.js:122:9)

Leveraging RabbitMQ Delayed Message Plugin

I can't seem to figure out how to pass it the proper options to get the Delayed Message Plugin to work properly when using servicebus.

My current attempt is as follows:

Consumer:

  const options = {
      ack: true,
      queueOptions: {
        exchangeOptions: {
          type: 'x-delayed-message',
          'x-delayed-type': 'direct'
        }
      }
    };
    bus.listen(eventName, options, handlerFunc);

Producer:

const options = {
    ack: true,
    headers: {
      'x-delay': 1000
    }
  }
bus.send(eventName, message, options);

The message sends and is received by the listener immediately (~100ms).

I'm wondering if I am not providing the information correctly on the headers on the send or with exchangeOptions on the listen. Or perhaps servicebus just doesn't support propagating these settings into the RabbitMQ queue/message properly.

Any help is appreciated.

Some more Documentation please

Would like to know the answers/documentation for the following questions:

  1. Some of the examples don't mention a RabbitMQ Url. Can servicebus work without RabbitMQ, or is that just for example purposes.
  2. RabbitMQ supports temporary or persistent queues. How can we use persistent queues with service bus.

If Yes:
3. In the case of a simple pub/sub model, is there a way to persist all published messages on RabbitMQ. And later be able to 'replay' all previously published messages before catching up to live?

Thanks.

Question about using servicebus in current architecture.

  • frontend angular app
  • proxy (frontend proxies through to backend services)
  • backend service 1 (REST)
  • backend service 2 (REST)
  • backend service 3 (REST)
  • etc..

We currently rely on each backend service being aware of other backend services. So when a scenario like new user registration happens we:

  • frontend makes a request to register a new user
  • backend service 1 receives the request and makes several other calls to other backend services in order to aggregate a response directly back to the frontend.

I'm considering using a message broker to handle our backend services communication but I'm unable to wrap my head around how I would do this. If you took the registration scenario above and completely decoupled the backend services via servicebus, how would I go about creating that same aggregate response for the frontend?

Support queues other than RabbitMQ

Hi,

Would you consider adding different queues than RabbitMQ ? Is there a way to easily replace RabbitMQ with Kafka, ZeroMQ, etc... ? I am having an issue where I'd like to use servicebus, bus our technology stack may not use RabbitMQ in a long run

Invalid example in readme

Given example causes an error

bus.listen('my.event', { ack: true }, function (event) {
  event.handle.acknowledge(); // acknowledge a message
  event.handle.ack(); // short hand is also available
  event.handle.reject(); // reject a message
});

because there is no event.handle

Pub/Sub acts like Round Robin

I've tried setting up a pub/sub setup with retry and what I'm seeing is that when a message is published to the bus, only one of my subscribers will get the message. On a second publish, the second subscriber gets the message and the first does not. A third message and now the first subscriber gets the message and the second doesn't. This continues indefinitely.

Here is my relative code:

 bus.subscribe(listener.event, (data, event) => {
            console.log(`subscriber is calling listener`);
            listener.handler(data, event)
                .then(() => {
                    //console.log('acking message after ');
                    console.log('!NOT! acking message after ');
                    //data.handle.ack();

                })
                .catch(err => {
                    data.handle.reject(err);
                });
        });

where my first subscriber/listener looks like this:

const Q = require('q');

module.exports = function () {
    const listener = this;

    listener.event = 'order.created';
    listener.handler = function (order, message) {
        console.log('Got an order! outputting purely as test:');
        console.log(order);
        return Q.promise((res,rej)=>{
            res();
        });
    };
    return listener;

};

my second one looks similar.

and my bus is called as a factory:

'use strict';

module.exports = function (options) {

    const bus = require('servicebus').bus(options || {});
    const retry = require('servicebus-retry');
    const config = require(require('path').resolve('./config/config'));

    bus.use(retry({
        store: new retry.RedisStore({host: config.redis.host, port: config.redis.port})
    }));

    console.log('initializing bus');


    return bus;
};

Using the pub/sub I was specifically looking for a durable (non-auto-delete queues, so use ack:true, just not here as debuggging) way of fanning out messages. I believe the docs suggest both my listeners should get each a message for 'order.created'.

Have I setup something wrong?

Pub/Sub example in docs does not work as indicated.

Hello,

Using 2.0.8

There's few things:

  1. Multiple bus.subscribe('sameName'); Only creates 1 queue sameName.[SOME RANDOM STRING]. No matter how many processes you run.
  2. Subscribe should create a fanout exchange by default (I think this is already recorded as an issue).
  3. Furthermore combining 1 and 2 above, the consumers get messages in round robin fashion. So assuming you publish 2 messages and you have 2 consumers, then each consumers gets 1 message. They should have gotten 2 messages each and a total of 4 messages. I.e: A fanout exchange writing to 2 queues.
  4. Based on what is explained above, the test case for pub/sub is not correct. You create 4 consumers and publish 4 messages the total of messages received should be 16 total (4 each) and not 1 each of total 4.
  5. The following app.js: http://pastebin.com/1Ab4JCVL for what ever reason, when you use the RabbitMQ management interface to send a test message, causes the application to exit. Seems to happen when any external application publishes a message. It also did it with a sample Java client publishing to Rabbit.

Example using `send` callback.

Hi Matt. Do you have any documentation on how to use the RabbitMQBus.send with the callback? Looking at the code, it seems coupled to the confirmChannel option and I'm not quite getting how that all works.

The use-case I'm looking at is a serial event chain where want to make sure that after processing one event, I've successfully sent it to the next queue before ack-ing.

Thanks in advance.

TypeError: conn.createChannel is not a function

Hello!

I encountered a weird issue regarding while testing connections to RabbitMQ. For example when I trying to connect stopped instrance of RabbitMQ service, I expecting to catched ECONNREFUSED, emited with on 'error' event. That's ok, but there for some reason it also throws unhandled exception TypeError: conn.createChannel is not a function, crashing application. I've made research for that problem and I've discovered that error comes from ".done" handler on making connection through amqplib module. That handler called with passed undefined conn argument, which should not meant to be executed at all, if it is rejected;

I know how to fix, based of my deeper research, including amqplib module and its depedency module when so im ready to make pull request. Because for me its needed to get fixed ASAP.

Adding correlationId

Is there any way I could set a correlationId? In amqplib I can pass correlationId in options and get it as msg.properties.correlationId in .consume.

ServiceBus typedef for Typescript

Hi Matt. I've published a typedef for the ServiceBus here it's still pretty rough and I need to add more DocBlocks, but is this something you are interested in linking to?

A success/error callback on send & publish

There is no promise/callback method to be able to move on or halt the program based on the output of a bus.send().

For example, if a queue is set to non-transient, an exception is thrown like this:

channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'queue-name in vhost '/': received 'true' but current is 'false'".

Ideally, the library would would have an api that looks like:

bus.send(..., function (error, response) {}
or promises:
bus.send().then() { // handle success here }.error() { // handle error here}

Work queues support?

Guys, I'm currently trying to investigate if servicebus is good for work-queue pattern implementation.

Does it fit, can you provide a small examples of producer => many consumers example? Could be the work queue just a send/listen, then there are many listeners for the same event?

Being Able to send back data in response.

Hello,

Thank you for your hardwork and happy new year.

I read this issue #48.
I wanted to know if a feature that allow the Process that receive the message to send back a response could match the vision of this project.

It's not only to know if sending the message is a success or a failure but also receiving data in return.

How to combine fanout with worker queues

I've been struggling to implement this using servicebus.

Example Scenario:

User service: 1 process
Order Service: 3 processes
Analytics Service: 5 processes

user updates firstname.

I want to publish 'event.user.updated', { user } to both order service and analytics service.

However, only one processes should pick it up and process it PER SERVICE.
e.g.
1 process from the 5 analytics processes picks the message up and does something.
1 process from the 3 order processes picks the message up and does something.

Everything I've tried with servicebus leads to roundrobin between the processes of all the services. How can I achieve this?

Node.js process dies if RabbitMQ node dies.

Hi,

I have the following.

const servicebus = require('servicebus');
const retry = require('servicebus-retry');

bus = servicebus.bus({
    url: 'amqp://localhost:6672'
});
bus.use(retry({
    store: new retry.MemoryStore()
}));

// This will set autoDelete to false, but we must manually ack messages.
var options = {
    ack: true
}

bus.subscribe('events.*', options, (msg) => {
        msg.handle.ack(() => {
            console.log('Acked message: ' + JSON.stringify(msg));
        });
});

console.log("events-management started.");

The moment the Rabbit node where servicebus is connected to dies (dokcer kill that 1 node), the Node.js process dies also with no exception caught.

We only see: Process finished with exit code 0

heartbeat to reconnect to server on disconnect

i am using servicebus:2.0.10 in macOS Sierra
After servicebus has established connection and is subscribed to events, i restart the rabbitmq server.

servicebus does not detects and hence it stays disconnected

Allow to read existing queues.

Unless I missed something, the queues are completely managed by servicebus.

bus.subscribe('foo.bar', options, (msg) => {
});

This will create a queue as foo.bar.8aadae7d-bd7e-4dc7-ba85-f57a102b06fe.

Is it possible to tell it to use an existing queue?

For instance we want to pre create the queue let it fill up and when the application finished loading it then starts to read the queue.

Request Reply

Does Service Bus support REQUEST REPLY kind of interaction between 2 microservices ?

connection error when vhost is specified as part of the url

The amqp uri specification shows how to specify a vhost within the url. Example:

amqp://guest:[email protected]/myvhost

But, if we use the following code to connect with servicebus...

var bus = require('servicebus').bus({ 
    url: 'amqp://guest:[email protected]/myvhost',
});

... we'll get an ECONNRESET error.

Browsing the code I found that the underlying amqplib supports this kind of URIs, but servicebus is expecting to receive the vhost in a different property in the options object. If the user doesn't specify a vhost, it will append a trailing slash automatically, which ends up generating a wrong URI.

I think that servicebus shouldn't modify the URI and just pass it through to the amqplib.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.