Git Product home page Git Product logo

stompit's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

stompit's Issues

Issue related to Master Slave approach of connecting through Stomp

I am using Active MQ broker in master slave approach and using node stomp to connect through failover. When it connects to slave broker , it wont be able to publish or listen as it is just the process is running but it wont serve any requests unless it becomes master and the failover is not automatically switching to master and get the connection done as it is always throwing error , how can we resolve this issue ?

Tag a new version of node-stomp

Could you tag a new version of node-stomp to include the security fix for submodules and the others enhancements since version 0.21.0 ?

Thank you in advance !

Message Throttling?

I've been looking through the code and have not found an ability nor where this could even live. We're in dire need of being able to throttle the number of messages accepted by the consumer through the subscription (or to state an amount of messages received vs. flooding the consumer).

When testing, a longer processing consumer might receive more messages in than it can process leading to a large stack of unacknowledged messages (when putting the consumer in charge of acking it's messages once completed). In our case this leads to a leak of a few thousand messages in a short time period which ultimately will cause the consumer to crash.

Is there a way to throttle the messages or where would one put this type of throttling into this module?

How to handle/consume binary data

Hi,

I have requirement where my node consumer will be consuming binary data. I get below 'message' object on receiving binary data:
{"_readableState":{"objectMode":false,"highWaterMark":0,"buffer":[],"length":0,"pipes":null,"pipesCount":0,"flowing":null,"ended":false,"endEmitted":false,"reading":false,"sync":true,"needReadable":false,"emittedReadable":false,"readableListening":false,"defaultEncoding":"utf8","ranOut":false,"awaitDrain":0,"readingMore":false,"decoder":null,"encoding":null},"readable":true,"domain":null,"_events":{},"_eventsCount":1,"command":"MESSAGE","headers":{"content-length":10,"expires":"1463061403952","destination":"/topic/test.topic.0","subscription":"1","priority":"4","message-id":"ID:shakti01.hyd.deshaw.com-51570-1462974867310-1:1:1:1:28","timestamp":"1462975003952"},"ackType":"ACK"}

I am not sure how to consume binary data with this API. Could you please help

Ack for published message?

Is there any way to get acknowledgement for published message?

STOMP supports acknowledgement via receipt.

ChannelPool doesn't allow to set recoverAfterApplicationError

I'd like to set the recoverAfterApplicationError flag to true for my ChannelPool.

I'm subscribed to AMQ and when there's a reconnection while about to ACK, I get Unexpected ACK received for message-id [ID:53a45ce8d9b8-35305-1507627714128-1:8:1:1:8].
This triggers an applicationError, that, according to the code (https://github.com/gdaws/node-stomp/blob/master/lib/Channel.js#L70) just stops processing (abort) and doesn't re-establish the connection.

By setting this flag to true, the connection is reset and the subscription keeps working as expected.

Right now, I'm using this workaround when getting the channel:

return new P((res, rej) => {
      this._channelPool.channel((error, ch) => error ? rej(error) : res(ch));
    })
      .then((channel) => {
        // Needed for the channel to recover when AMQ error happens (typically wrong ACK receipt on reconnection)
        // Assigned like this because ChannelPool doesn't allow sending this parameter in the config (Channel does)
        channel._recoverAfterApplicationError = true;
        return channel;
      });

But it doesn't sound right setting internal _ vars like that.

  • Can the ChannelPool class expose this option (I'd rather go for this) or Channel class provide a proper setter for this value?
  • Is there any other way to achieve what I want?

Thank you!

How to enlarge maximum frame size

I have a problem - Error: The maximum frame size was exceeded.
Tried to set different headers, e.g.
'maxWebSocketFrameSize': 16 * 1024 * 1024
or
'wireFormat.maxFrameSize': 16 * 1024 * 1024
But have the same error.
Also tried change config in hosts:
'host': '/?wireFormat.maxFrameSize=100000000',
'host': '/?wireFormat.maxDataLength=100000000',
'host': '/?websocket.maxTextMessageSize=999999999'

But unsuccessfully. Could you help me to get rid this error?

Recursive process.nextTick detected. This will break in the next version of node.

I am using node v0.10.36 and got below warning when I tried to use client.ack(message) method:

node.js:402
throw new Error(msg);
^
Error: (node) warning: Recursive process.nextTick detected. This will break in the next version of node. Please use setImmediate for recursive deferral.
at maxTickWarn (node.js:402:15)
at process._nextTick as _currentTickHandler
at process.nextTick (node.js:358:15)
at onwrite (_stream_writable.js:266:15)
at WritableState.onwrite (_stream_writable.js:97:5)
at Socket._write (net.js:654:5)
at doWrite (_stream_writable.js:226:10)
at writeOrBuffer (_stream_writable.js:216:5)
at Socket.Writable.write (_stream_writable.js:183:11)
at Socket.write (net.js:616:40)
at service (/u/choudhab/node_modules/stompit/lib/OutgoingFrameStream.js:245:28)
at dequeue (/u/choudhab/node_modules/stompit/lib/OutgoingFrameStream.js:263:3)

Wiki documentation - use channel or client?

Hi,

I was looking for STOMP Javascript clients for my Node application. This library offers great features like fault tolerance using failover uri etc. But I am had a lot of trouble in understanding the API without sufficient documentation.
I still don't understand seeing the examples whether I should use Channels or Client API? When to use prefer one over other? What is the general recommendation?

Are there any other events besides - 'connecting' and 'error'?

Thanks,
Abhi

How to set message properties, example AMQ_SCHEDULED_DELAY

hi, how do i set AMQ_SCHEDULED_DELAY property of the message (this property delays the sending of the message, http://activemq.apache.org/delay-and-schedule-message-delivery.html

I am using the following code to send the message

var channel = new stompit.ChannelFactory(connectionManager);
channel.send(queue, messageText, function(error) {
if (error) {
var error = new Error('Cannot sendMessage: ' + messageText + ' : ' + error);
callback(error);
}
else {
console.log('Successfully sent message on queue ' + queue + ':' + messageText);
callback()
}

});

Receive binary content

Hi, I need to receive from stomp server a message with binary payload but I only found functions readString and readEmptyBody at IncomingFrame.
This lib supports this kind of message? If yes, is possible convert the stream to node Buffer?

Out of memory error

Hi,
I see below out of memory error. The stack trace shows a stompit module. Do you know what could be the cause of this? I have a lot of updates coming from ActiveMQ server.

$> node index.js

Connecting to mqbrokerprod:61613
Connected to mqbrokerprod:61613, with options: {"host":"mqbrokerprod","port":61613,"connectHeaders":{},"ssl":false}

<--- Last few GCs --->

 1774616 ms: Scavenge 1397.6 (1456.6) -> 1397.6 (1456.6) MB, 4.7 / 0 ms (+ 2.0 ms in 1 steps since last GC) [allocation failure] [incremental marking delaying mark-sweep].
 1775820 ms: Mark-sweep 1397.6 (1456.6) -> 1397.5 (1456.6) MB, 1203.8 / 0 ms (+ 3.0 ms in 2 steps since start of marking, biggest step 2.0 ms) [last resort gc].
 1777215 ms: Mark-sweep 1397.5 (1456.6) -> 1397.3 (1456.6) MB, 1395.3 / 0 ms [last resort gc].


<--- JS stacktrace --->

==== JS stack trace =========================================

Security context: 0x6ddfd937399 <JS Object>
    1: _parseHeaderLine(aka /* anonymous */) [/u/choudhab/activemq/djs/stomp-client/node_modules/stompit/lib/IncomingFrameStream.js:~228] [pc=0x3bb47cf118e3] (this=0x2a35b111e079 <an IncomingFrameStream with map 0x21e609c90ad9>)
    2: _continueTransform [/u/choudhab/activemq/djs/stomp-client/node_modules/stompit/lib/IncomingFrameStream.js:111] [pc=0x3bb47ce98c55] (this=0x2a35b111e079 <an Incomi...

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory
Abort (core dumped)

Durability

Hi,

This is probably my issue and if so I apologise in advance.

I'm trying to connect to ActiveMQ and subscribe to a topic with durability based off of this coed block.

const stompit = require('stomp');

stompit.connect({ 'client-id': 'SampleClient', host: 'localhost', port: 61613 }, (err, client) => {
    client.subscribe({ 'activemq.subscriptionName': 'SampleSubscription', destination: '/topic/SampleTopic' }, (err, msg) => {
        msg.readString('UTF-8', (err, body) => {
            console.log(body);

            client.disconnect();
          });
      });
  });

It happily reads the first message sent and disconnects.

If I ...

  • send a second message
  • connect the client again
  • send a third message

...it doesn't receive the second message.

Any thoughts?

Thanks

Process one message at a time

Hi, i am using this library for past one year and very happy with it. The only thing i am not able to figure out is how does i constrain my subscriber to process one message at a time.

In current code (below) , the subscriber starts processing all message at the same time.

var subscribe = function (queueName) {
var subscribeHeaders = { 'destination': '/queue/' + queueName, 'ack': 'auto'};
var connectionManager = new stompit.ConnectFailover(server);
var channel = new stompit.ChannelFactory(connectionManager);
logger.log('info', 'subscriber listening on queue ' + queueName);
channel.subscribe(subscribeHeaders, function (error, message) {
if (error) {
logger.log('fireError', 'subscribe error ' + error.message);
return;
}
message.readString('utf-8', function (error, body) {
if (error) {
logger.log('fireError', 'read message error ' + error.message);
return;
}
logger.log('info', 'received message: ' + body);
message.ack();
processMessage(JSON.parse(body));
});
});
};

Cannot connect to ActiveMQ

Hi,
I'm trying to connect to ActiveMQ 5.8.0 (running on port 8161) using your lib like this:

stompit.connect({ host: 'localhost', port: 61613, connectHeaders:{ host: '/', login: 'admin', passcode: 'admin' } }, function(error, client){ if(error){ // error handling} //do something() });

Also tried to use the 8161 port..

But I keep getting "connect ECONNREFUSED 127.0.0.1:61613"...
What could be wrong?

Thanks.

Does it support deliverymode as persistent ?

i am facing an issue while consuming from the topic through stompit client against activemq broker , and i would like to setup a delivery mode on message which i publish to topic as persistent , is there any way i can do that to set a deliverymode as persistent ?

Best way to set a continuous consumer

Hi guys, I've being struggling a little bit understanding what is the best strategy for a task I got involved at my work. Currently I need to set a continuous consumer for a dedicated queue in one of our node js applications. I handled at the moment creating a connect fail over object and generating a channel passing the fail over object and then subscribing to the dedicated queue.

Works properly for about few hours, then suddenly I lost the connection with no errors and no reconnection attempt. My understand is that the connect fail over will perform reconnect and if not available will to a lookup in the list of servers that I provide in the build.

Is this the proper way to achieve the case I'm working on or is there any other recommendable option?. Currently I have to being restarting the server every 2 or 3 hours.

here a little example of what I'm doing, please correct if I'm wrong, I'm new into stomp servers:

MessageBussService.js
getConnectFailOverObject() {
    return new stompit.ConnectFailover(['server1','server2','server3'], {
      maxReconnects: 50,
      randomize: true,
      useExponentialBackOff: true
    });
  }

subscribe(connectFailOver, responseQueueName) {
    let channel = new stompit.Channel(connectFailOver, { alwaysConnected: true });

    channel.subscribe({
      destination: responseQueueName
    }, (err, message) => {
      if (err) return vApp.log.error(err);

      return app.eventEmitter.emit('MQ_EVENT_IN', message, message.headers);
    });
  }

ServerInit.js

MessageBusService.subscribe(MessageBusService.getConnectFailOverObject(), '/queue/DEDICATED_QUEUE');

client.nack(message) invalid

qq20161208-175307
var subscribeHeaders = {
'destination': destination,
'ack': 'client-individual'
//'ack': 'client'
};
var overEvent = function(err, message){
if(err){
client.nack(message);
} else {
client.nack(message);
}
}
client.subscribe(subscribeHeaders, function(error, message) {
if (error) {
logger.error('subscribe error ' + error.message, error);
return subscribeFailCallback(error, message);
}
message.readString('utf-8', function(error, body) {
if (error) {
logger.error('read message error ' + error.message, error);
return overEvent(error, message);
}
onMessageCallback(body, function(err){
overEvent(err, message);
});
});
});

can not nack message!!! it's acked!

'connection timed out' error with activeMQ broker and heartbeat

I frequently get these, usually just after receiving a messages but sometimes at random. I have it set up to automatically reconnect but I might miss messages during the few hundred milliseconds reconnection takes.

I think it has to do with with the heartbeat. I got the errors with heartbeat at '5000,5000' and '10000,10000'. Once I turned heartbeat off the errors went away, though I would prefer to keep heartbeat.

I checked with the broker (activemq) admin and there is nothing in the broker log. I also set up a ping to the broker but it kept receiving packets with average latency during the 'connection timed out' errors.

Are you aware of any issues like this?

Binary payload

  var frame = client.send(sendHeaders);
  frame.write('hello');
  frame.end();

What will be the correct way to send Buffer instead of 'hello'?
It looks that frame.write(Buffer('test')); gets converted to string and then send?
Even setting headers like:

    var sendHeaders = {
        'destination': '/queue/test',
        'content-type': 'binary'
    };

connection timeout

Hi, I'm not sure this is an issue so please take it as a simple question. I'm using your awesome library for a nodejs server that keeps listening on an AMQ queue. The client must keep listening for new messages so I'm not doing any client.disconnect() after message.ack().
Everything works fine, but now and then I receive a 'connection timed out' error I don't know how to recover from. When that error happens, the server stops receiving messages.

I was thinking to reissue the stompit.connect command when that happens but I wonder if there's any better way to handle such errors.

Thanks.

client handles errors differently when given heartbeat 0,0 vs 5000,5000

It seems the stompit client does not handle errors the same way when configured to heart-beat at 0,0 vs 5000,5000. With numbers larger than 0, the client correctly emits an error which can be caught and used to reconnect. It seems with 0,0 the error will automatically go to uncaughtException.

The code I'm using looks similar to the following:

    const manager = new stompit.ConnectFailover(servers, reconnectOptions);

    manager.connect((err, client, reconnect) => {

      if (err) {
        // terminal error, given up reconnecting
        this.emit('error', err);
      }
      
      console.log('connected to bus');

      client.on('error', (err) => {

        console.log('stompit client error (reconnecting now): ', err);

        this.reconnecting = true;
        
        reconnect();

      });

      this.client = client;

      if (this.reconnecting) {
        console.log('reconnected to bus');

        // replay and sends that did not yet reach the broker
        console.log(`re-sending ${this.pendingSends.size} _sends`);
        this.pendingSends.forEach((pendingSend) => {
          setTimeout(pendingSend, 500);
        })
        console.log(`reapplying ${this.subscriptions.length} subscriptions`);
        // resubscribe to any disconnected subscriptions now that we've reconnected
        this.subscriptions.forEach((subscription) => {
          subscription();
        });
      } else {
        this.emit('connected');
      }
      
      cb();

    });

In both scenarios, I see console.log('stompit client error (reconnecting now): ', err); logged. But with 0,0 I immediately see a log statement from my process.on('uncaughtException and the additional reconnect logic never gets a chance to run. With 5000,5000 the reconnect logic works as expected.

Throughput drop with JMS MapMessage to Text transformation in STOMP consumer

Hi,
I am trying to measure throughput for my use-case and noticed this strange thing:

With text messages from a java producer, the nodejs consumer is able to consumer messages at 10 Kmsgs/sec without any pending acks.
However, the same nodejs consumer with exact same code (except the transformation header "transformation":jms-map-json during subscription) is able to consumer only 500 msgs/sec. I see lot of pending acks on broker.

What could the reason that library is able to send acks faster for Text messages than map messages which is causing throughput drop.

I know for sure from jconsole stats that the broker has messages in pending queue and is not sending them because of pending unacknowledgements.

Do you have any idea why this difference is there?

Thanks,
Abhishek

Memory leak

I'm testing your stomp client, and looks like there is a memory leak, I use the following code:


var util = require('util');
var stompit = require('stompit');
var memwatch = require('memwatch');


var connectParams = {
    ssl: true,
    host: '....',
    port: 61613,
    connectHeaders: {
        login: '...',
        passcode: '...'
    }
};

stompit.connect(connectParams, function(err, client) {
    if (err) {
        console.log('Unable to connect: ' + err.message);
        return;
    }
    console.log('connected');

    var subscribeParams = {
        'destination': '/topic/www.de',
        'ack': 'auto'
    };

    client.subscribe(subscribeParams, function(err, message) {
        message.readString('utf8', function(err, string) {
        });
    });
});


var hd = new memwatch.HeapDiff();

setInterval(function(){
    memwatch.gc();
    var diff = hd.end();
    console.log(util.inspect(diff, true, 5))
    hd = new memwatch.HeapDiff();
}, 10000)

I read from a stream and memory constantly grows, from 12 to 140 mb after couple hours.

How to get timeout error after client.ack()

I have this block pf code. Some times the connection throw a timeout erro after client.ac()

    client.subscribe(enviroment.posistion_queue, (error, message) => {
      if (error) {
        logger.info('subscribe error =>', error);
        return;
      }
  
      message.readString('utf-8', (error, body) => {
        if (error) {
          logger.info('read message error =>', error);
          return;
        }
  
        let positionParser = new PositionParser();
        let parsedMessage = positionParser.parseMessageFromXml(body);
        sidekiqAdapter.schedulePositionWorker(parsedMessage);
        client.destroy();
        if(enviroment.need_to_process) {
          setTimeout(function () {
            client.ack(message);
          }, 5000);
        }
      });
    });

How can a get this error and try to reconnect the server? For exemple:

client.on('error', () => {
  // do something 
})

How do I ensure if a client is disconnected?

I'm facing random disconnection issue, despite using Heartbeat (all of a sudden no message is being consumed, in ActiveMQ dashboard, there is no consumer attached to the queue). Heartbeat is set to 10000,10000 in my application. I tried connecting from my local system to ActiveMQ broker on a serve. I disconnected my internet, and Disconnect event was triggered as expected. I'm using Client, and after that I've handled the reconnect mechanism in my application itself.

let clientEvents = {
    CONNECT: "connect",
    DISCONNECT: "end",
    DISCONNECTING: "finish",
    ERROR: "error",
    CLOSE: "close"
};

let actions = {
    connect: () => {
        logger.debug("[AMQ-H] Event - Client Connected");
        isConnected = true;
    },
    disconnect: () => {
        logger.error("[AMQ-H] Event - Client Disconnected");
        isConnected = false;
    },
    error: (error) => {
        logger.warn("[AMQ-H] Event - Client Error Occurred - message - ", error.message);
        logger.info("[AMQ-H] Event Is Transport Error", error.isTransportError());
    }
};
    stompClient = stompit.connect({
        host: config.server.host,
        port: config.server.port,
        connectHeaders: {
            'heart-beat': "10000,10000"
        }
    }, (error, stompClient) => {
        if (error) {
            logger.debug("[AMQ-H] Error Occurred while Connecting", error);
            return;
        }
        client = stompClient;
    });
    stompClient.on(clientEvents.CONNECT, actions.connect);
    stompClient.on(clientEvents.DISCONNECT, actions.disconnect);
    stompClient.on(clientEvents.DISCONNECTING, actions.disconnect);
    stompClient.on(clientEvents.CLOSE, actions.disconnect);
    stompClient.on(clientEvents.ERROR, actions.error);

After a little debugging, I was able to replicate one of the disconnection scenario If I'm explicitly killing the broker on remote (pkill -p {broker pId}), error event is emitted, with message "connection timed out", but no disconnect event. How can I ensure that client-server connection is always alive?

Recursive process.nextTick detected warning

Hey,
I'm getting node spit this out when under a bit of load
(node) warning: Recursive process.nextTick detected. This will break in the next version of node. Please use setImmediate for recursive deferral.

I'm running v0.10.38

Error: The maximum data length was exceeded

I need to send more then 100Mb data.
FRAME_SIZE in activemq was increased to 304857600 bytes, but I have an error while trying to send data:
...node_modules/stompit/lib/IncomingFrameStream.js:118
throw error;
^

Error: The maximum data length was exceeded

Support for the reply-to header

I'm trying to use this library to do RPC style calls over STOMP to RabbitMQ.

However, it appears that this library doesn't properly support the automatic subscriptions created by a SEND frame containing the reply-to header. I believe ActiveMQ also provides similar functionality in their STOMP adapter.

For a detailed overview of how this works in RabbitMQ see: https://www.rabbitmq.com/stomp.html, under the the heading: Temp Queue Destinations

Here is a brief description of the problem with an example:

I want to use STOMP to send a message from a client to a server, and receive a response from that server. The steps are:

  1. RPC server subscribes to receive messages from the client with this header:
{
  destination: '/queue/test'
}
  1. RPC client sends a message using the following header. The broker creates an exclusive, temporary, queue with an opaque name, and automatically subscribes the client to this queue.
{
  destination: '/queue/test',
  reply-to: '/temp-queue/test'
}
  1. The RPC server receives a message with the following headers:
{ subscription: '1',
  destination: '/queue/test',
  'message-id': 'T_1@@session-rZDaF8mwGFwIy8YjMERxuw@@1',
  redelivered: 'false',
  ack: 'T_1@@session-rZDaF8mwGFwIy8YjMERxuw@@1',
  receipt: '1',
  'reply-to': '/reply-queue/amq.gen-JKMuz5nr1sGWN0h_BzxGKw', // this is the temporary exclusive queue
  'content-length': 9 }
  1. RPC server does the RPC call and sends the reply to the client using the temp queue provided by the broker in the reply-to header in step 3:
{
  destination: '/reply-queue/amq.gen-JKMuz5nr1sGWN0h_BzxGKw'
}
  1. The client receives the reply, via the automatic subscription created by the broker in step 2, with the following headers:
{ subscription: '/temp-queue/test',
  destination: '/queue/amq.gen-UzS0B3cF65bo_a-ARv7uZg',
  'message-id': 'T_/temp-queue/test@@session-eBbX3tY5PqdlT4raf_3hvg@@2',
  redelivered: 'false',
  receipt: '2',
  'content-length': 14 }

At this point the node Client library is destroyed and raises the following error:

invalid subscription id /temp-queue/test

Because there is no registered subscription for that id in the client (it was registered automatically by the broker). This happens here:

https://github.com/gdaws/node-stomp/blob/master/lib/Client.js#L403-L410

I've tried manually creating a subscription with /temp-queue/test as the subscription id, but this is explicitly disallowed by the broker and results in a broker error that also destroys the client.

Would it be possible to change this library to support this use case? I'm not 100% sure what the API would look like.

One possibility would be to provide an onReply function in the options when calling Client#send. This could automatically register a subscription for the reply with the client, where subId === reply-to header. The handler can then be called once with the reply message, and the subscription removed automatically by the Client. This would be similar to the way message receipts work.

var options = {
 replyTo: '/temp-queue/test', // automatically adds reply-to to header, like receipts?
 onReply: function(message) {
    // handle the reply here
 }
}
client.send(headers, [options])

Open to ideas, and can probably help with a PR.

unexpected end of stream

I am frequently getting an "unexpected end of stream" error. What might be the cause of this and how can I better debug it? I see it comes from IncomingFrameStream.js

Where is this project going?

Hi,

I played around with the module a bit and nothing really seems to work. None of the examples work out of the box. To subscribe to a destination using the ChannelFactory a call to channel() is required. Consuming messages results in a maximum call stack size exceeded message and the following error:

(node) warning: Recursive process.nextTick detected. This will break in the next version of node. Please use setImmediate for recursive deferral.

The base code and idea look good, but please bring this project to production ;)

Defer reading message

Hi,

Looking at the code, it seems to be that I must call message.readString synchronously when I get the callback with a message. Instead, I'd like to store the frame and asynchronously read it when I have cycles (e.g. setTimeout(readMsg, 100)). Is it possible to defer reading the message or is my reading correct that it either must be consumed or is lost?

Thanks!

Provide a way to specify connect headers like heart-beat and client-id along with failoverUri string

I am using ActiveMQ and want to have a durable subscription for a topic. I need to specify client-id in connect headers along with activemq.subscriptionName in subscribe headers.

But with usage of failoveruri string as in below code I am not sure how to do that. I can use servers list but using failover uri string is cleaner and easier. I would like to modify heart-beat header as well before connect.

Could you please help me here?

var stompit = require('stompit')
var reconnectOptions = {
    'maxReconnects': 100,
    'randomize'    : false
};

var connManager = new stompit.ConnectFailover("failover:stomp://host1:61613,stomp://host2:61613)", reconnectOptions);

Callbacks for subscribe/unsubscribe methods

It could be useful to have callbacks for subscribe/unsubscribe methods. In this case a client could manage subscriptions easily.

I could try to make a PR with the changes if the maintainer is interested in it.

Leaving the msg on the queue and accessing it later ?

Hi,

I wonder if it's possible to leave the msg on the queue, I have a working example using the sample code. I changed the client.ack to client.nack but my client.subscribe isn't recalled.

Maybe I am missing something ?

As part of my flow, I wanted to reject the msg and leave it in the queue so I can process it later either manually or automatic.

Any ideas how I would achieve this ?

I am using the following

    var subscribeHeaders = {
        'destination': destination,
        'ack': 'client-individual'
    };

Any help really appreciated.

Thanks

EventEmitter memory leak with ack message

When I set ack mode to client-individual with a call back message.ack I get the following error :

(node) warning: possible EventEmitter memory leak detected. 11 error listeners added. Use emitter.setMaxListeners() to increase limit.
Trace
    at Client.addListener (events.js:239:17)
    at Client.sendFrame (/home/jerome/public/node-stomp/lib/Client.js:78:12)
    at Subscription._sendAck (/home/jerome/public/node-stomp/lib/client/Subscription.js:106:18)
    at IncomingFrame.ack (/home/jerome/public/node-stomp/lib/client/Subscription.js:135:12)
    at IncomingFrame.<anonymous> (/home/jerome/public/node-stomp/test/Client.js:529:33)
    at emitNone (events.js:72:20)
    at IncomingFrame.emit (events.js:166:7)
    at endReadableNT (_stream_readable.js:905:12)
    at doNTCallback2 (node.js:441:9)
    at process._tickCallback (node.js:355:17)

I'm using Node v4.2.4 on linux.

I can create a pull request with a test case to reproduce this issue, though I'm not sure how to fix it.

Jerome

client.ack is not a function

Hi there,

I'm getting the following error when running the example code for node-stomp https://github.com/gdaws/node-stomp/blob/master/README.md

TypeError: client.ack is not a function
    at /home/david/tmp/node_stomp/app.js:50:14
    at IncomingFrame.<anonymous> (/home/david/tmp/node_stomp/node_modules/stompit/lib/IncomingFrameStream.js:488:5)
    at emitNone (events.js:72:20)
    at IncomingFrame.emit (events.js:166:7)
    at endReadableNT (_stream_readable.js:913:12)
    at nextTickCallbackWith2Args (node.js:442:9)
    at process._tickCallback (node.js:356:17)

nodejs version 4.4.3
stompit version 0.22.0

Looking at the API it seems the ack() is something which should be called on the message rather than the client, and changing from client.ack(message) to message.ack(); resolved the issue. Might be a case of documentation/code desync.

Receive binary content with "0" in bytes

I'm trying to send binary data with 0 in the bytes with code:

var data = new Buffer('lalala');
data[1]= 0;
data[0]= 1;
var frame = client.send(sendHeaders);
frame.write(data);
frame.end();

and receive the data with

client.subscribe(subscribeHeaders, function (error, message) {
    if (error) {
        console.log('subscribe error ' + error.message);
        return;
    }
    message.on('readable', function (d) {
        console.log('data:', message.read());
    })
    ...

Then it gives me error "Error: Unknown STOMP action: lala"
Is that the right way to send and receive the binary data?

Examples of use as a stream?

Hi gdaws,
Can you offer an example of using Stompit as a stream, if it is possible? I would like to understand if it is feasible to do something like in the example below:

var writeStream = fs.createWriteStream('./path/to/file.ext');
stompit.connect(CONNECTION_PARAMETERS, function (err, client) {
    client.subscribe(SUBSCRIPTION_PARAMETERS).pipe(writeStream);
});

Thanks,

G.

can't consume a file message

I am trying to consume a message contaning a file. When I consume the message, and save the resulting file, it is different from the initial file that I uploaded. What is my problem?

Here is the add message function:
async addFileMessage(filePath, queueName, id, type) {
stompit.connect(this.connectParams, function (error, client) {

        if (error) {
            console.log('Unable to connect: ' + error.message);
            return;
        }

        let fileStat = fs.statSync(filePath);
        let contentLength = fileStat.size;
        console.log(contentLength);

        let sendParams = {
            'destination': '/queue/' + queueName,
            'content-type': 'application/octet-stream',
            'content-length': contentLength
        };

        if (id) {
            sendParams['correlation-id'] = id;
        }
        if (type) {
            sendParams['type'] = type;
        }

        let frame = client.send(sendParams);
        let file = fs.createReadStream(filePath);
        file.pipe(frame);

        frame.on('finish', function () {

            client.disconnect(function (error) {
                if (error) {
                    console.log('Error while disconnecting: ' + error.message);
                    return;
                }
                console.log('Sent file');
            });

        });

    }.bind(this));

}

And here is the consume function:
` subscribe(queueName, callback) {
stompit.connect(this.connectParams, function (error, client) {

        if (error) {
            console.log('Unable to connect: ' + error.message);
            callback(error, null);
        }

        let subscribeParams = {
            'destination': '/queue/' + queueName,
            'ack': 'client-individual'
        };


        client.subscribe(subscribeParams, function (error, message) {

            let correlationId = message['headers']['correlation-id'];
            let type = message['headers']['type'];

            let body = "";

            let read = function () {
                let chunk;
                while (null !== (chunk = message.read())) {
                    body += chunk;
                }
            };

            message.on('readable', read);

            message.on('end', function () {
                let obj = {'message': body, 'id': correlationId, 'type': type};
                callback(null, obj);
                client.ack(message);
            });

        });
    });`

Cannot exit gracefully while reconnecting

See https://gist.github.com/scop/e877cc7138884508adca

The problem is that while stompit is reconnecting, it seems that there's no way to exit gracefully by handling SIGINT/SIGTERM. To reproduce, run the script with nothing listening on localhost:61613 and try to exit it with Ctrl-C:

Connecting to localhost:61613
Connection error to localhost:61613: connect ECONNREFUSED
Connecting to localhost:61613
Connection error to localhost:61613: connect ECONNREFUSED
[...]
^CShutting down
Channel closed
Connecting to localhost:61613
Connection error to localhost:61613: connect ECONNREFUSED
[...]

Looks like there's nothing in stompit that could be called from the shutdown function that would cause the reconnect loop to exit and let the program terminate. After the connection and login has succeeded, the code in the gist seems to work fine, shutdown function gets called and the process exits on Ctrl-C.

I cannot connect to ActiveMQ

I'm using ActiveMQ and because of some problems, I have been thinking in replace stomp-js by BenjaminWS with your module, just because it looks more advanced in programming terms.

Well, my problem is that I cann't even run the pubsub.js example, I can connect to my ActiveMQ server using stomp-js but with your script just doesn't work.

I always get this error qhen executing the code:

Error: Transport error: connect ECONNREFUSED

Unable to set heart-beat header using failoverUri string

Hi,

I am trying to set heart-beat header for node stomp client using failover uri string but it is not picking up correct header value.

I am using below url: (case#1)

var connectHeaderString = "connect[connectHeaders][client-id]="+ clientId + "&connect[connectHeaders][heart-beat]=44444,10000";
var failoverUri = "failover:(stomp://argon45.nyc:61613,stomp://argon45.nyc:61613)?"
                  + connectHeaderString;

In this case the heartbeat value isn't considered a string.

It doesn't work if I try this also (case#2)

var connectHeaderString = "connect[connectHeaders][client-id]="+ clientId + "&connect[connectHeaders][heart-beat]='44444,10000'";

In this case the regex doesn't match the heart beat header as it header value contains the extra quotes:
heart-beat is string type - "'10000,10000'"

I checked by adding log statements around heart beat code in Client.js:

var heartbeat = this.getHeartbeat();
  console.log("heart-beat before setting" + JSON.stringify(heartbeat));

  //console.log('hearders: \n' + JSON.stringify(headers));
  if (typeof headers['heart-beat'] === "string") {
    console.log('heart-beat is string type - ' + JSON.stringify(headers['heart-beat']));
    var match = headers['heart-beat'].match(/^(\d+) *, *(\d+)$/);
    if (match) {
      console.log('heart-beat matches the regex')
      heartbeat = [parseInt(match[1], 10), parseInt(match[1], 10)];
      this.setHeartbeat(heartbeat);
      console.log("heart-beat after setting" + JSON.stringify(heartbeat));
    }
  }

  headers['heart-beat'] = heartbeat[0] + "," + heartbeat[1];
  console.log('heart-beat set to - ' + JSON.stringify(headers['heart-beat']));

The logs i get on console are below (for case#1):

Connecting to argon45.nyc:61613
heart-beat before setting[0,0]
heart-beat set to - "0,0"
Connected to argon45.nyc:61613, with options: {"host":"argon45.nyc","port":61613,"connectHeaders":{"client-id":"stompit-test-consumer","heart-beat":44444},"ssl":false}

Logs for case#2:

Connecting to argon45.nyc:61613
heart-beat before setting[0,0]
heart-beat is string type - "'44444,10000'"
heart-beat set to - "0,0"
Connected to argon45.nyc:61613, with options: {"host":"argon45.nyc","port":61613,"connectHeaders":{"client-id":"stompit-test-consumer","heart-beat":"'44444,10000'"},"ssl":false}

Q: any plans for persistent reconnect functionality?

It looks like failover.js is concerned only with establishing initial connection to the server. Did you have any plans to implement a persistent client that would automatically reconnect in case of connection loss and resubscribe to all previously subscribed destinations, or do you think this is out of scope for this particular library? I am considering switching to your library from https://github.com/benjaminws/stomp-js but since you're actively updating the code it would help me avoid duplication of effort if I had an idea whether this is the functionality you may be planning to add in the near future.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.