bbc / sqs-consumer Goto Github PK
View Code? Open in Web Editor NEWBuild Amazon Simple Queue Service (SQS) based applications without the boilerplate
Home Page: https://bbc.github.io/sqs-consumer/
License: Other
Build Amazon Simple Queue Service (SQS) based applications without the boilerplate
Home Page: https://bbc.github.io/sqs-consumer/
License: Other
Both should be set to ['All'] for simplicity.
I am curious what's the best way to run multiple consumers in the same process and prioritize various ones differently?
I'm working on a project where it's possible that messages being sent are larger than the sqs limits of 256kb.
I have found this solution https://github.com/awslabs/amazon-sqs-java-extended-client-lib for java but was wondering if there was any similar package for Node.js.
Thank you
From the documentation of sqs-consumer, I understood that a message from the queue would be deleted only if "done()" is called in handleMessage. I have a usecase that I want the message to be left in the queue after being processed. But when the run the following code, node exits after processing a single message and only starts polling only if I call done()
Is this expected?
var Consumer = require('sqs-consumer');
var AWS = require('aws-sdk');
AWS.config.loadFromPath('../config/aws-config.json');
var app = Consumer.create({
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/myQ',
waitTimeSeconds: 5,
handleMessage: function (message, done) {
console.log(message);
//done();
},
});
app.on('error', function(err) {
console.log(err.message);
});
app.start();
By default messages are processed one at a time โ a new message won't be received until the first one has been processed. To process messages in parallel, use the batchSize option detailed below.
Can batchSize runs the different messages simultaneously at the same time and thus fetch remaining messages if any from the AWS SQS and then execute them as well or we have to apply parent and child process for parallel execution of multiple messages from the aws SQS
Hey!
Is there any way to make deleting a message configurable? The scenario is that I want to delete a message when I know for sure I have processed it.
Cheers.
There is a bug with running multiple consumers on one queue within the same process. Below a test I am running which gives very inconsistent results. In particular in regards to the effects of the done() call. Namely in case of multiple consumers a done() call like below looks to not have any effect, and to not delete the message. That means the queue items are processed ad infinitum, every time after the visibilityWindow expires. It looks like a major inhibition to run this library at proper scale. Just working with the batchSize=10 is too low. It means you can only run 10 messages in parallel per instance even if the logic is mostly async (It should be close to 500+/instance). The issue seems to be in the async.series() call in the libraries _processMessage function. It has something to do with the consumers being initialized and started in a normal (sync) for-loop. Its worth to see if it can get fixed by trying to start multiple consumers within an async loop
`
var SQSWorkers = function(config){
//initialization related stuff
...
this.numberWorkers = 40;
this.consumerWorkers = [];
this._setup(config);
}
SQSWorkers.prototype._setup = function(config){
var self = this;
for(var i=0; i<this.numberWorkers; i++){
this.consumerWorkers[i] = Consumer.create({
queueUrl: config.producerUrl,
messageAttributeNames: ["retried"],
handleMessage: function (message, done) {
var notificationStart = Date.now();
var messageBody = JSON.parse(message.Body);
var retried = message.MessageAttributes && message.MessageAttributes.retried ? Number(message.MessageAttributes.retried.StringValue) : 0;
var worker = this;
logger.info("ConvertStoreConsumer start handling event from SQS worker:%s start:%s msgId:%s msg: %j", worker.workerName, notificationStart, message.MessageId, message, {});
//processing the message happens on onEvent()
return self.onEvent(messageBody, function(err){
//on error message will be enqueued again for later retry
//TODO add some logs on error and elaborate more
if(err){
logger.error("ConvertStoreConsumer error on handling event from SQS. worker:%s error:%j msgId:%s retrying...", worker.workerName, err, message.MessageId, {});
return self._handleRetry(message.MessageId, message.Body, retried, function(){
return done();
});
}
//otherwise remove item from queue with done()
logger.info("ConvertStoreConsumer finished handling event from SQS worker:%s docid:%s tenant:%s v:%s dur:%s", worker.workerName, messageBody.id, messageBody.tenant, messageBody.v, Date.now() - notificationStart, {});
done();
});
}
});
this.consumerWorkers[i].workerName = "consumer-worker-" + i;
this.consumerWorkers[i].on('message_processed', function(message){
//otherwise remove item from queue with done()
logger.info("ConvertStoreConsumer message processing done - worker:%s msgId:%s", this.workerName, message.MessageId, Date.now(), {});
});
}
}
SQSWorkers.prototype.start = function(param){
//start all workers
if(this.consumerWorkers && this.consumerWorkers.length){
this.consumerWorkers.forEach(function(worker, pos){
worker.start();
});
}else{
// just log an error here and don't throw an error to not break on local
logger.error("ConvertStoreConsumer not initialized for SQS. ConvertStoreConsumer cannot start.", {});
}
}
SQSWorkers.prototype.onEvent = function(message, callback){
//this is the application logic, basically what is supposed to happen with a message
//Its mostly REST calls to other services
....
if(err){
return callback(err);
}
callback(null)
}
`
We use this library all over the place at work and every handleMessage we write has to Promise.try(() =>
because sqs-consumer doesnt catch errors causing the consumer to fall over on the first thrown exception.
I think thats a surprising behavior. I suggest adding a try/catch that calls done(error) if a error bubbles up that far.
Messages after being left on queue with done(err) still have to wait for their visibility timeout to expires. I want to add feature for terminating message's visibility timeout. Are you guys accepting pull request?
Today the region default value is hard coded:
this.sqs = options.sqs || new AWS.SQS({
region: options.region || 'eu-west-1'
});
https://github.com/bbc/sqs-consumer/blob/master/index.js#L66
I suggest to read it from default AWS_REGION environment variable instead.
Like this:
this.sqs = options.sqs || new AWS.SQS({
region: options.region || process.env.AWS_REGION
});
WDYT?
I can open pull request if you agree.
Currently the functionality is tied to the 10 batch queue offered by SQS, but what if I want to have one more layer of queue buffering keeping in mind that 10 is a quite small batch. Let's say i want to process queues in batches of 500
const queue = Consumer.create({
queueUrl: '...',
handleMessage: (message, done) => {
someBuffer.push(message)
if (someBuffer.length === 500) {
// process all messages
process(someBuffer, done)
}
}
})
process(buffer, done) {
buffer.forEach(message => {
// do something
done(message.ReceiptHandle)
})
}
Currently this is not possible because the done()
callback is called against the message hitting the handleMessage
function, they can't be "stored for later processing".
/xxxx/node_modules/aws-sdk/lib/request.js:30
throw err;
^
Error: Callback was already called.
at /xxxx/node_modules/async/lib/async.js:30:31
at /xxxx/node_modules/sqs-consumer/index.js:127:5
at /xxxx/node_modules/async/lib/async.js:251:17
at /xxxx/node_modules/async/lib/async.js:154:25
at /xxxx/node_modules/async/lib/async.js:248:21
at /xxxx/node_modules/async/lib/async.js:612:34
at Response.<anonymous> (/xxxx/node_modules/sqs-consumer/index.js:141:5)
at Request.<anonymous> (/xxxx/node_modules/aws-sdk/lib/request.js:353:18)
at Request.callListeners (/xxxx/node_modules/aws-sdk/lib/sequential_executor.js:105:20)
at Request.emit (/xxxx/node_modules/aws-sdk/lib/sequential_executor.js:77:10)
Hi, does anyone have the same exception ?
May be good to include some flowtypes
import type { SQS } from 'aws-sdk'
import type { EventEmitter } from 'events'
declare module 'sqs-consumer' {
declare class SQSConsumer mixins events.EventEmitter {
start(): void,
stop(): void,
on(event: 'error', cb: (e: Error) => void): this,
on(event: 'processing_error', cb: (e: Error) => void): this,
on(event: 'message_recieved', cb: (message: string) => void): this,
on(event: 'stopped', cb: () => void): this,
on(event: 'empty', cb: () => void): this
}
declare type SQSConsumerConfig = {|
queueUrl: string,
batchSize?: number,
sqs?: SQS,
region?: string,
messageAttributeNames?: Array<string>,
attributeNames?: Array<string>,
terminateVisibilityTimeout?: boolean,
visibilityTimeout?: number,
waitTimeSeconds?: number,
authenticationErrorTimeout?: number,
handleMessage(message: string, done: () => void): mixed
|}
declare function create(config: SQSConsumerConfig): SQSConsumer
}
Hi,
I get an exception thrown regularly, and can't find a way to fix the issue.
I'm hoping you have an idea about the cause?
I use FIFO queues, with the following settings:
I then have my code to send/receive the messages:
var queue = {
send: function(payload, options) {
options = _.extend({
group: '',
queue: 'scores'
}, options);
var sqsParams = {
MessageBody: JSON.stringify(payload),
QueueUrl: 'https://sqs.us-east-2.amazonaws.com/'+ftl.options.aws.client.id+'/'+ftl.options.env+'-'+options.queue+'.fifo'
};
sqsParams.MessageDeduplicationId = options.id;
sqsParams.MessageGroupId = options.group;
sqs.sendMessage(sqsParams, function(err, data) {});
},
getHandler: function(name, callback) {
return Consumer.create({
region: 'us-east-2', // OHIO, to get FIFO queues
queueUrl: 'https://sqs.us-east-2.amazonaws.com/'+ftl.options.aws.client.id+'/'+ftl.options.env+'-'+name+'.fifo',
batchSize: 1,
visibilityTimeout: 20,
handleMessage: callback,
sqs: new AWS.SQS()
});
}
};
To send data I use:
queue.send({
uuid: 'xxxxx',
session: 'yyyyy',
race: 'zzzzz'
}, {
group: 'yyyyy',
id: 'some-long-unique-id',
queue: 'race-start'
});
To listen I use:
var scoreQueue = queue.getHandler('race-start', function(message, done) {
ftl.data.sessionv2.classic.processScore(JSON.parse(message.Body), done);
});
It works for a while, then randomly triggers this:
SQSError: SQS delete message failed: Value AQEB3XwPqs1p3WOXYP12JIR33hxePQ/Amls5OXgY2yaj2CKkFM3CuEPXgAPw4KbUWAffCgsHezZJqOEIOyDOlX4JTg+ndrTW4xu4RDEC9PJy8KQvuHU7ALbtIOb9N4U2yIN9TJwWj+GkK9vNKUeC9ZJslgeG5QUS/Xj4x2tWICsbdldlexuuP+V06qixVL/n7Q/8Ws6/IEWD7awwPy2zUehe0eZ16FfpCESoPICehhy+cEn/pm3+a7X2aCV9vqhNpEoGvHnEGyVAdzlAhR82cm2ejQO+SJYtHdpWN4tgI8j5TU4= for parameter ReceiptHandle is invalid. Reason: The receipt handle has expired.
at Response.<anonymous> (C:\xampp\htdocs\node-fw\node_modules\sqs-consumer\index.js:197:24)
at Request.<anonymous> (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\request.js:364:18)
at Request.callListeners (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\sequential_executor.js:105:20)
at Request.emit (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\sequential_executor.js:77:10)
at Request.emit (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\request.js:682:14)
at Request.transition (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\request.js:22:10)
at AcceptorStateMachine.runTo (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\state_machine.js:14:12)
at C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\state_machine.js:26:10
at Request.<anonymous> (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\request.js:38:9)
at Request.<anonymous> (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\request.js:684:12)
at Request.callListeners (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\sequential_executor.js:115:18)
at Request.emit (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\sequential_executor.js:77:10)
at Request.emit (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\request.js:682:14)
at Request.transition (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\request.js:22:10)
at AcceptorStateMachine.runTo (C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\state_machine.js:14:12)
at C:\xampp\htdocs\node-fw\node_modules\aws-sdk\lib\state_machine.js:26:10
If you have any tip, it'd be greatly appreciated!
Looks like when calling done(err) on a processing error the message becomes visible immediately again whatever the configured visibilityTimeout is. That means the message becomes available again immediately and will get consumed again by other consumers (potentially from other instances) so unless the application handles an error differently, calling done(err) will cause immediate retrying.
I think it should be documented more clearly what the effects of done(err) are and how it affects retries etc.
Is there a way to tell if an error was raised by client code (one of the message processing handlers) or by the library (sqs-consumer) internally without manually checking for all types of events sqs-consumer could possibly raise?
SQSError objects apparently are intended to inherit from Error (since they use the Error prototype), but I was surprised to find that they have no stack trace.
To properly inherit from Error, this code:
function SQSError(message) {
this.name = 'SQSError';
this.message = (message || '');
}
SQSError.prototype = Error.prototype;
should be:
function SQSError(message) {
Error.captureStackTrace(this, this.constructor);
this.name = this.constructor.name;
this.message = (message || '');
}
util.inherits(SQSError, Error);
Is there any method to check queue is empty or not?
Hi, if you enter 10 messages in sqs. One every millisecond. The order that the sqs-consumer consumes the messages and in the order in which they were inserted?
Hi,
We're using sqs-consumer and it's working well for our nodejs microservice, but I feel that it's too abstracted from the AWS implementation, meaning that if we want to use other parameters in the receive function, we can't. It should be closer to how AWS allows us to do things.
As such, I've forked your code and added a new signature to the consumer function:
function Consumer(params, handler, sqs)
The first two parameters are required, while the last one is optional. The params is an object to be used in the receive function, and the handler is simply a function to process the message, while the sqs argument is to set a custom SQS object if needed but defaults to new AWS.SQS()
if none specified.
By using this implementation, the library gets to concentrate on what it does best, which is the polling mechanism, and let's the AWS sdk does what it does best.
I wanted to know if this is something that you would consider pulling back in. If not, I would just create my own project since I don't want to spent the time rewriting the unit tests.
I need to change some settings before processing received messages from sqs.
What do you think about a interception function before starting processing?
It will be provided as options.beforeHandleMessages @ constructor:
this.beforeHandleMessages = options.beforeHandleMessages || function(cb) { cb(); }
and it would be called as follow @ _handleSqsResponse method:
this.beforeHandleMessages(function() {
async.each(response.Messages, this._processMessageBound, function () {
// start polling again once all of the messages have been processed
consumer._poll();
});
})
Do you have a better idea?
once you give me ok, I'll unit test it ๐
hi! i'm use sqs-consumer.
it is great npm for AWS SQS. thank you.
my handlemessage fucntion write promise chain.
findOrder()
.then(pushToRider)
.catch((err) => {
logger.error('catch error', err)
done(err) // or i'm try return done(err)
})
i want left message from queue when occured error with promise chain.
and error message is move dead letter queue by redrive sqs policy.
but occured promise error when occured promise catch error.
10|bsqs | (node:66103) UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 2): Error: Callback was already called.
why this error occured???
thank you.
It seems like the consumer only retrieves n
messages and doesn't do another SQS request before the n
messages have been handled.
Would it be possible to add an option to retrieve 2n
messages? This would allow the consumer to have n
messages being handled at all time, and not [1-n]
. I realize that this would add another abstraction layer to the consumer, but it would potentially save a lot of execution time.
I want to include additional logic to _processMessage method but I don't want to touch the core code. Is it possible to do that? I was playing around with Consumer.prototype._processMessageBound and Consumer._processMessage but I always get an error "TypeError: Cannot read property 'emit' of undefined". See code below. I actually want to pass additional data in the options, and access it in the handleMessage
.
Consumer.prototype._processMessage = (message, cb) => {
var consumer = this;
console.log('SOME CODES HERE');
console.log(this.additionalData.version);
this.emit('message_received', message);
async.series([
function handleMessage(done) {
consumer.handleMessage(message, done);
},
function deleteMessage(done) {
consumer._deleteMessage(message, done);
}
], function (err) {
if (err) {
if (err.name === SQSError.name) {
consumer.emit('error', err, message);
} else {
consumer.emit('processing_error', err, message);
}
} else {
consumer.emit('message_processed', message);
}
cb();
});
};
Consumer._processMessageBound = Consumer.prototype._processMessage.bind(Consumer);
Consumer.create({
queueUrl: `https://sqs.ap-southeast-1.amazonaws.com/${AWS_SQS_QUEUE}`,
handleMessage: handleMessage,
sqs: new AWS.SQS(),
batchSize: 10,
additionalData: {version:2}
})
I'm getting [Error: socket hang up] code: 'ECONNRESET', body: {}
after I consumed all the message, how do I prevent this error?
I am getting the error below when I call .start();
/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/request.js:31
throw err;
^
Error
at Consumer._handleSqsResponse (/home/me/Documents/Repositories/my-sqs-listener/node_modules/sqs-consumer/index.js:124:24)
at Request.<anonymous> (/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/request.js:364:18)
at Request.callListeners (/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/sequential_executor.js:105:20)
at Request.emit (/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/sequential_executor.js:77:10)
at Request.emit (/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/request.js:682:14)
at Request.transition (/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/request.js:22:10)
at AcceptorStateMachine.runTo (/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/state_machine.js:14:12)
at /home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/state_machine.js:26:10
at Request.<anonymous> (/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/request.js:38:9)
at Request.<anonymous> (/home/me/Documents/Repositories/my-sqs-listener/node_modules/aws-sdk/lib/request.js:684:12)
Here is my code:
import debug from 'debug';
import Consumer from 'sqs-consumer';
import AWS from 'aws-sdk';
export class Queue {
constructor(eventCallback) {
this.debug = debug('gsl:lib/queue');
this.debug('Loading...');
AWS.config.update({
region: 'eu-west-1',
accessKeyId: 'KEY',
secretAccessKey: 'ACCESS'
});
return Consumer.create({
queueUrl: 'https://...my-sqs-service.fifo',
handleMessage: eventCallback,
sqs: new AWS.SQS()
});
}
}
import debug from 'debug';
import { Queue } from './lib/';
class App {
constructor() {
this.debug = debug('gsl:app');
this.debug('Starting...');
this.debug('Loading queue...');
this.queue = new Queue(this.onMessage);
this.queue.start();
}
onMessage(message, callback) {
this.debug('Got new message!');
this.debug(message);
callback(null);
}
}
global.app = new App();
I just spent some time finding out the hard way (after restarting a virtual machine in the cloud) that your library does not inspect it's environment, nor does it regulate its own sanity...pffff
please kill the loop if the environment is not sane, as in no aws creds found etc...
On day 14, between 07:00 a.m UTC and 10:00 a.m UTC, there was a problem consuming in a sqs queue.
I wonder if this kind of problem could happen.
Code:
var consumer = require('sqs-consumer');
function createTrip() {
var app = consumer.create({
queueUrl: config.createTrip,
handleMessage: function(message, done) {
done();
}
});
app.on('error', function(error) {
console.log(error);
});
app.start();
}
The code above is what you use to consume as messages from the queue. If you need more data, please ask!
Anyone facing high memory usage when deploying to production?
I'm using pm2 to keep process alive, but memory usage is almost 500MB. D:
I'm using the batchSize option (set to 10), consume times vary for diffierent messages (for example, a user with 5 followers VS one with 10k followers), and now sqs-consumer will wait for the slowest one to complete before next poll
Currently I do:
done()
which deletes a message when successful.done(err)
which does nothing with a message, when it is not successful...That causes the message to be re-polled and processed, until SQS moves the message to a dead letter queue...
This is a great setup... But some of my messages, I know why and when they should be re-processed, I know they need to be delayed (on a schedule)...
It seems like, as an alternative to done()
we could have a delaySeconds(60)
which deletes the message and re-sends it with a delaySeconds=60
I know I could do this myself, with sqs-producer
- but it seems a common and simple enough thing that building it in made sense.
thoughts?
(also - thanks - this is an excellent set of libraries for getting started with SQS fast)
I am running a worker, which works well generally.
However, if there is a Timeout error, for example download file online, Timeout might happen in http request. and the worker is going to wait for every.
Would appreciate some help in figuring out this issue. I'm running 7 workers in parallel using pm2
. After a few minutes, only one worker continues to process messages. A few minutes after that, no workers are receiving messages. Message Received
and Message Processed
are printed by all workers, then just a few workers, then one, and then none.
Here are my settings and relevant code.
Default Visibility Timeout: 90 seconds
Message Retention Period: 4 days
Maximum Message Size: 256kb
Delivery Delay: 0s
Receive Message Wait Time: 10s
import 'babel-polyfill'
/** Libraries */
import SQSConsumer from 'sqs-consumer'
import serializeError from 'serialize-error'
/** Internal */
import IndexingService from './services/indexing.service'
import logger from './utilities/logger'
import ravenClient from './utilities/raven'
const indexingService = new IndexingService()
const startProcess = async () => {
const app = SQSConsumer.create({
queueUrl: 'https://sqs.us-east-1.amazonaws.com/your-queue-here',
waitTimeSeconds: 5,
visibilityTimeout: 90,
handleMessage: async (message, done) => {
try {
const parsedBody = JSON.parse(message.Body)
await indexingService.startProcess(parsedBody)
done()
}
catch (e) {
logger.error({
'Error': serializeError(e)
})
if(process.env.NODE_ENV == 'production') {
//ravenClient.captureException(e)
}
done(e)
}
}
})
app.on('error', (e) => {
logger.error({
'Error': serializeError(e)
})
console.log(e);
});
app.on('processing_error', (e, message) => {
logger.error({
'Error': serializeError(e)
})
console.log(message);
});
app.on('stopped', () => {
console.log('Worker Stopped.');
});
app.on('message_received', () => {
console.log('Message Received.');
});
app.on('message_processed', () => {
console.log('Message Processed.');
});
app.start()
}
startProcess()
I have a FIFO queue and I'm running into seemingly random occurrences of the following error after calling done();
:
SQS delete message failed: Value AQEBWH9ttaf5P9olvqrYyaNik4axKT43ASxVhN+Hty/wT21Ip4cZ2QbaCFy7cKwghHrqxTszz00vtOncnn6NQAYezPNqrFQvnByQB1QhT5MCF7Y6rlkrLgVEVY19Jz0r5sfI8DPNvYo6EuBR2qJzEwUwWfVv9fCH552Ud67tjNmm89Hzm43zo/mZ+QY84XZMHLGfe7NrCVbdbWvSRoiCeSH7Mi1pOCiBCPiAJbBV7fsDkKCRDzjVE7W5LlIfjB1eQ8Rk7CB8WNsqGVXxctiDClKbnE8Gol+G3+d22YTUIe+a8= for parameter ReceiptHandle is invalid. Reason: The receipt handle has expired.'
I have only 1 consumer running. Here are the queue configuration details:
Default Visibility Timeout: 30 seconds
Maximum Message Size: 256 KB
Delivery Delay: 0 seconds
Queue Type: FIFO
Content-Based Deduplication: Enabled
I can provide more info if necessary.
Thanks!
I have created a cluster process in and assigned one process to poll SQS. After processing message the worker process sends a message to Master process.send("Message"). But after that i am getting the following error.
Error: Callback was already called.
at C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:903:32
at C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:3858:13
at handleMessage (C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\sqs-consumer\index.js:157:9)
at C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:3853:24
at replenish (C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:946:17)
at C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:950:9
at eachOfLimit (C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:975:24)
at C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:980:16
at _parallel (C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:3852:5)
at Object.series (C:\Users\Naveen Kerati\Desktop\Node_Apps\node_modules\async\dist\async.js:4708:5)
I would like to get messages that are more than 10 minutes, and leave as messages that are less this time. Is there any attribute to send for this to happen?
I'm running this as a long running process and using some external process to manage it (upstart/supervisor/etc). I don't want to forcibly kill the process as it could be in the middle of processing an event. I was hoping to pass a callback to consumer.stop()
which would be called when the consumer stop and all existing messages are processed. The idea is to do something like this,
consumer.start();
process.on("SIGTERM", function() {
consumer.stop(function(){
process.exit(0);
});
});
I am using fake_sqs to emulate SQS for development. Unfortunately, fake_sqs does not acknowledge the ReceiveMessageWaitTimeSeconds
attribute, thus causing sqs-consumer
to rapidly make requests. I know this isn't a sqs-consumer
bug but:
I'm curious if there's a way to slow down the requests - specifically for development.
If not, does anyone know of any other (preferably docker-wrapped) fake SQS implementations that would support long polling?
At the moment the event emitter fires a single 'error' type for message errors and queue conection errors. Would be great if those errors are separated in 'error' and 'message_error'. For example if you want to leave the message in the queue because you can't process it at the moment a message_error would be fired.
Also would be nice if a re-poll option is added when polling the queue fires an error.
I can do a pr for that If you are agree.
Thanks
As per documentation empty
event is used Fired when the queue is empty (All messages have been consumed).
but code does not fire it when there are no messages in the queue
if (response && response.Messages && response.Messages.length > 0) {
async.each(response.Messages, this._processMessageBound, function () {
// start polling again once all of the messages have been processed
consumer._poll();
});
} else if (response && !response.Messages) {
this.emit('empty');
this._poll();
} else if (err && isAuthenticationError(err)) {
// there was an authentication error, so wait a bit before repolling
debug('There was an authentication error. Pausing before retrying.');
setTimeout(this._poll.bind(this), this.authenticationErrorTimeout);
} else {
// there were no messages, so start polling again
this._poll();
}
};
I have an sqs-consumer inside a restify.createJsonclient, which calls a restify server,
How do I make it such that sqs-consumer will wait for server response before consuming another sqs message?
Code is same as #30
Have you considered supporting the DeleteMessageBatch request for SQS?
FYI - Using this module to replace our own implementation so its nice to remove clunky code from our app as much as possible. Thanks for this.
Hi,
Curious as to why the handleMessage function is mandatory when you have an event emitter for message_received implemented?
If handleMessage is mandatory does it not deem the event emitter redundant as you have to implement both. Any insights much appreciated as it is a great looking bit of code to learn from.
Thanks
Hi,
I'm doing some test locally with my queue. I'm sending 100 messages in a loop in my queue and using sqs-consumer
to consume those messages but it always stop around 50-60 messages consumed. I'm using a batch size of 5 and everything works perfectly for the first 50-60 messages but he rest of the messages stay in Message in flight
in AWS SQS and I'm not able to process them.
Thank you very much for any help!
Here is my code
const queueSendMessage = Consumer.create({
messageAttributeNames:ย ["All"],
batchSize: 5,
queueUrl: process.env.AWS_SQS_QUEUE_URL,
handleMessage: (message, done) => {
done()
},
sqs: new AWS.SQS({
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
region: 'us-east-1'
})
});
queueSendMessage.on('error', (err) => {
console.log("ERROR SQS:", err.message);
});
queueSendMessage.start();
I just want to listen to the messages (since there are other handlers for the events) and I don't need the messages deleted.
isAuthenticationError might return true because of 'CredentialsError'.
Does it even make sense to do - setTimeout(this._poll.bind(this), this.authenticationErrorTimeout);
If the Credentials are wrong it will always fail to connect to the queue. There is no point of trying to connect after every 10 seconds. So, in case of wrong credentials the service should exit instead of keep trying to connect to the queue.
Hi,
When using Consumer.create() with batchSize > 1, should the callback be executed once when all the messages have been handled, or once per message in the batch?
I have some messages stuck in flight since I activated the batchSize > 1, and I can't figure out how to avoid that.
Thanks.
const Consumer = require('sqs-consumer');
const app = Consumer.create({
queueUrl: config.aws.delta.queueUrl,
handleMessage: (message, done) => {
logger.debug(message);
processor.process(message).then((result) => {
logger.trace(`Message processed.`);
done();
}).catch((err) => {
logger.error("Failed to handle message", message);
// uncomment next line to get it to work without bailing
//done();
});
}
});
...
app.start();
If I uncomment done()
then it will work. But I often want the message to stay on the queue -- maybe I couldn't connect to the database momentarily and that's why it failed. As the code is, if I hit that .catch
and don't call done()
the process mysteriously exits, even if I have an uncaughtException/unhandledRejection listener on the process.
"sqs-consumer": "^3.6.1",
Node 6
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.