Git Product home page Git Product logo

nestjs-sqs's Introduction

nestjs-sqs

Test npm version

Tested with: AWS SQS and ElasticMQ.

Nestjs-sqs is a project to make SQS easier to use and control some required flows with NestJS. This module provides decorator-based message handling suited for simple use.

This library internally uses bbc/sqs-producer and bbc/sqs-consumer, and implements some more useful features on top of the basic functionality given by them.

Installation

npm i --save @ssut/nestjs-sqs @aws-sdk/client-sqs

Quick Start

Register module

Just register this module:

@Module({
  imports: [
    SqsModule.register({
      consumers: [],
      producers: [],
    }),
  ],
})
class AppModule {}

Quite often you might want to asynchronously pass module options instead of passing them beforehand. In such case, use registerAsync() method like many other Nest.js libraries.

  • Use factory
SqsModule.registerAsync({
  useFactory: () => {
    return {
      consumers: [],
      producers: [],
    };
  },
});
  • Use class
SqsModule.registerAsync({
  useClass: SqsConfigService,
});
  • Use existing
SqsModule.registerAsync({
  imports: [ConfigModule],
  useExisting: ConfigService,
});

Decorate methods

You need to decorate methods in your NestJS providers in order to have them be automatically attached as event handlers for incoming SQS messages:

import { Message } from '@aws-sdk/client-sqs';

@Injectable()
export class AppMessageHandler {
  @SqsMessageHandler(/** name: */ 'queueName', /** batch: */ false)
  public async handleMessage(message: Message) {}

  @SqsConsumerEventHandler(/** name: */ 'queueName', /** eventName: */ 'processing_error')
  public onProcessingError(error: Error, message: Message) {
    // report errors here
  }
}

Produce messages

export class AppService {
  public constructor(
    private readonly sqsService: SqsService,
  ) { }

  public async dispatchSomething() {
    await this.sqsService.send(/** name: */ 'queueName', {
      id: 'id',
      body: { ... },
      groupId: 'groupId',
      deduplicationId: 'deduplicationId',
      messageAttributes: { ... },
      delaySeconds: 0,
    });
  }
}

Configuration

See here, and note that we have same configuration as bbc/sqs-consumer's. In most time you just need to specify both name and queueUrl at the minimum requirements.

License

This project is licensed under the terms of the MIT license.

nestjs-sqs's People

Contributors

calexandrepcjr avatar cassiolacerda avatar crazyglue avatar derkobe avatar eduardoazevedo3 avatar egimenos avatar evanwhitten avatar eyadkobatte avatar hdiaz-nectia avatar iziodev avatar nbdn avatar ssut avatar verdini avatar workmad3 avatar zyles avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nestjs-sqs's Issues

Problem pulling queue name from .env

@SqsMessageHandler(`${process.env.QUEUE_NAME}`, false)
public async handleMessage(message: AWS.SQS.Message) {
    console.log(message);
}

@SqsConsumerEventHandler(
    `${process.env.QUEUE_NAME}`,
    'processing_error',
)
public onProcessingError(error: Error, message: AWS.SQS.Message) {
    // report errors here
    console.error(error);
    console.error(message);
}

When pulling the queue name through .env on the console, the following error occurs:
No metadata found for: QueueName

@SqsMessageHandler() is not being added to Nest's execution context

When I want to use a NestJs interceptor (using standard NestJs interceptors) to wrap the whole message handler method into an transaction the interceptor never gets executed because the @SqsMessageHandler()-annotation is not registered with NestJs.

@Injectable()
export class CustomerImportMessageHandler {
  private logger = new Logger(CustomerImportMessageHandler.name);
  
  @SqsMessageHandler(Queue.CustomerImport)
  @UseInterceptors(SentryTransactionInterceptor)
  public async handleMessage(message: SQS.Message) {
          // do whatever you have to do.
  }
}

@Injectable()
export class SentryTransactionInterceptor implements NestInterceptor {

  // Never gets executed.

  intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
    const transaction = Sentry.startTransaction({
      name: context.getClass().name,
    });
    return next.handle().pipe(tap(() => transaction.finish()));
  }
}

Error when registering more than one consumer

Hello,

I have an app running correctly consuming from only one queue. Now I'm trying to add an additional queue to the consumers array, but getting this error:

messaging-service_app  | /app/node_modules/@ssut/nestjs-sqs/dist/sqs.service.js:56
messaging-service_app  |             (_a = this.options.consumers) === null || _a === void 0 ? void 0 : _a.forEach((options) => {
messaging-service_app  |                                                                                   ^
messaging-service_app  | TypeError: Cannot read properties of undefined (reading 'meta')
messaging-service_app  |     at /app/node_modules/@ssut/nestjs-sqs/dist/sqs.service.js:65:49
messaging-service_app  |     at Array.forEach (<anonymous>)
messaging-service_app  |     at SqsService.<anonymous> (/app/node_modules/@ssut/nestjs-sqs/dist/sqs.service.js:56:83)
messaging-service_app  |     at Generator.next (<anonymous>)
messaging-service_app  |     at fulfilled (/app/node_modules/@ssut/nestjs-sqs/dist/sqs.service.js:17:58)
messaging-service_app  |     at processTicksAndRejections (node:internal/process/task_queues:95:5)
messaging-service_app  |   prisma:client:libraryEngine:exitHooks  exit event received: exit +0ms

consumer.module.ts

@Module({
  imports: [
    SqsModule.registerAsync({
      useFactory: () => {
        return {
          consumers: consumerConfig,
          producers: producerConfig,
        };
      },
    }),
...

The consumerConfig object is build like that:

export const consumerConfig: SqsConsumerOptions[] = [
  {
    name: 'email_delivery_request',
    queueUrl: ' http://localhost:4566/000000000000/email_delivery_request',
    region,
  },
  {
    name: 'email_status_change',
    queueUrl: 'http://localhost:4566/000000000000/email_status_change',
    region,
  },
];

Any idea what might be happening? This exact setup is working fine if the consumerConfig array has only one element.

Thanks in advance.

Dependency injection doesn't work as expected

It seems that the method decorated with @SqsMessageHandler() gets called just as plain function, ignoring the class it is defined in and its initialisation steps. My setup is following:

scheduled-jobs.handler.ts

@Injectable()
export class ScheduledJobsHandler {
    private readonly logger = new Logger(ScheduledJobsHandler.name);

    constructor(
        private createInvoicesJob: CreateInvoicesJob,
        private chargeInvoicesJob: ChargeInvoicesJob,
        private squareSyncJob: SquareSyncJob,
    ) {}

    @SqsMessageHandler(SCHEDULED_JOBS_QUEUE_NAME, false)
    async handleMessage(message: AWS.SQS.Message) {
        try {
            this.logger.log(
                `SQS event: ${message.Body},
                 QUEUE NAME: ${SCHEDULED_JOBS_QUEUE_NAME}`,
            );
            const body: any = JSON.parse(message.Body);
            switch (body.jobType) {
                case constants.SCHEDULED_JOBS.CREATE_INVOICES:
                    this.logger.log('Running CREATE INVOICE JOB');
                    await this.createInvoicesJob.createInvoicesForDay(body);
                    this.logger.log('Finished CREATE INVOICE JOB');
                    break;
                case ....
            }
        } catch (e) {
            this.logger.error('An error occurred while processing the message.', e);
        }
    }
}

app.module.ts

@Module({
    imports: [
        ConfigModule.forRoot({ isGlobal: true }),
        ...
        SqsModule.registerAsync({
            inject: [ConfigService],
            useFactory: (config: ConfigService) => ({
                consumers: [
                    {
                        name: config.get('SQS_SCHEDULED_JOBS_QUEUE_NAME'),
                        queueUrl: config.get('SQS_SCHEDULED_JOBS_QUEUE_URL'),
                        region: config.get('AWS_REGION'),
                    },
                ],
            }),
        }),
        MailModule,
    ],
    providers: [ScheduledJobsHandler],
})
export class AppModule {}

When the message is received in the debugger I can see that this.logger and this.createInvoicesJob and even constants are all undefined. Seems like the file itself is not executed, only the decorated function. Is this expected behaviour?

meta undefined

Hi,

I'm trying to setup this package in my app module class like that:

const sqs = new AWS.SQS({
  apiVersion: '2012-11-05',
  credentials: new AWS.Credentials('xxxxx', 'xxxxx'),
  region: 'none',
});

@Module({
  imports: [
  SqsModule.registerAsync({
    useFactory: () => {
      return {
        consumers: [
          {
            name: 'my-queue',
            queueUrl: `${SQS_ENDPOINT}/queue/`, // SQS_ENDPOINT: 'http://localhost:9324'
            sqs, // instance of new AWS.SQS
            waitTimeSeconds: 1,
            batchSize: 3,
            terminateVisibilityTimeout: true,
            messageAttributeNames: ['All'],
          }
        ],
        producers: [],
      };
    },
  }),

but, when run my app thow next error:

[NestWinston] Warn 2020-12-9 17:09:04 [SqsService] No metadata found for: my-queue - {}
(node:22174) UnhandledPromiseRejectionWarning: TypeError: Cannot read property 'meta' of undefined

This is the line in sqs.service.js

const isBatchHandler = metadata.meta.batch === true;

I only need to consume messages from SQS and I'm using a elasticmq docker container to test:

docker run -p 9324:9324 -p 9325:9325 softwaremill/elasticmq

where I set the queueUrl ?

Hi, I don't know where to put the values ​​of queueUrl, .
await this.sqsService.send(/** name: */ 'queueName'

The name parameter has the name of the queue but the url where I should put it?

Error handlers are not called

First, thank you for the awesome lib, it saves me a lot of time handling SQS in Nest.

There is one problem though, that the error handlers are not called at all.
Do you have any intuition on how that might happen?

@Injectable()
export class WorkspaceDeletionService {
  constructor(
    @Inject(WINSTON_MODULE_NEST_PROVIDER) private readonly logger: Logger,
    private readonly rollbar: RollbarLogger,
  ) {}

  @SqsMessageHandler(SqsQueueName.WORKSPACE_DELETION_QUEUE)
  public async handleMessage(message: AWS.SQS.Message): Promise<void> {
    try {
      const messageBody: WorkspaceDeletionMessage = JSON.parse(message.Body);
      const workspaceId = messageBody.workspaceId;
      if (workspaceId) await this.deleteWorkspace(workspaceId);
    } catch (error) {
      // Because the error will not be passed to the handler methods, I must resort to using try/catch to manually handle the 
     // application error here
      const errorMessage = `Handle workspace deletion message error: ${error}`;
      this.logger.debug(errorMessage);
      this.rollbar.error(errorMessage);
      throw error;
    }
  }

  @SqsConsumerEventHandler(SqsQueueName.WORKSPACE_DELETION_QUEUE, "error")
  public onError(error: Error, message: AWS.SQS.Message): void {
    this.logger.error(error.toString(), JSON.stringify(message));
  }

  @SqsConsumerEventHandler(SqsQueueName.WORKSPACE_DELETION_QUEUE, "processing_error")
  public onProcessingError(error: Error, message: AWS.SQS.Message): void {
   // This method is not called whatsoever
    this.logger.error(error.toString(), JSON.stringify(message));
  }

  @SqsConsumerEventHandler(SqsQueueName.WORKSPACE_DELETION_QUEUE, "timeout_error")
  public onTimeoutError(error: Error, message: AWS.SQS.Message): void {
   // This method is not called whatsoever
    this.logger.error(error.toString(), JSON.stringify(message));
  }
}

How to deleting message from queue manually?

Can you please explain how we can delete messages manually? I went through the complete library code, but I didn't find any implementation for deleting the message from the SQS queue.

[Question] What happens when a processing_error occurs?

We had an issue with one queue that had multiple messages which had marked as processing_error. Some how most of the message have been replayed since the issue was solved, but we lost some messages.

Now I wonder what exactly happens if a processing error occurs? Will it become a new message in the queue? Will it just be in-flight? When will it be retried? How often will it retry? Can I configure the delay between attempts or other parameters?

Implementation for deleting message from queue

As I went through complete library code, I didn't find any implementation for deleting the message from SQS queue.
Shouldn't this be implemented or else SQS queue will eventually become full?

Consumer stopped polling for messages no error logs

I'm using @ssut/[email protected] with a FIFO queue. After running fine for few days the consumer stopped polling for messages without any error logs. The containing service has been up and running without any issues. None of the error handlers triggered.
Screenshot 2022-05-31 at 1 39 03 PM

  @SqsConsumerEventHandler(
    /** name: */ sqsQueueName,
    /** eventName: */ 'error',
  )
  public onError(error: Error, message: AWS.SQS.Message) {
    this.logger.log(message, 'Error message');
    this.logger.error(error, 'Error Exception');
  }

  @SqsConsumerEventHandler(
    /** name: */ sqsQueueName,
    /** eventName: */ 'processing_error',
  )
  public onProcessingError(error: Error, message: AWS.SQS.Message) {
    this.logger.log(message, 'Processing Error message');
    this.logger.error(error, 'Processing Error Exception');
  }

  @SqsConsumerEventHandler(
    /** name: */ sqsQueueName,
    /** eventName: */ 'timeout_error',
  )
  public onTimeoutError(error: Error, message: AWS.SQS.Message) {
    this.logger.log(message, 'Timeout Error message');
    this.logger.error(error, 'Timeout Error Exception');
  }

Type Error Cannot read properties of undefined (reading 'meta')

the undefined issue in this library
node : v16.19.0
Version: "@ssut/nestjs-sqs": "^2.0.0",
OS: Windows 11
aws-sdk version: "2.1388.0"

    "errorMessage": "Cannot read properties of undefined (reading 'meta')",
    "errorType": "TypeError",
    "stackTrace": [
        "TypeError: Cannot read properties of undefined (reading 'meta')",
        "    at node_modules\\@ssut\\nestjs-sqs\\dist\\sqs.service.js:64:49",
        "    at Array.forEach (<anonymous>)",
        "    at SqsService.<anonymous> (\\node_modules\\@ssut\\nestjs-sqs\\dist\\sqs.service.js:55:83)",
        "    at Generator.next (<anonymous>)",
        "    at fulfilled (\\node_modules\\@ssut\\nestjs-sqs\\dist\\sqs.service.js:17:58)",
        "    at processTicksAndRejections (node:internal/process/task_queues:96:5)"
    ]
}

When I try to publish I getInvalidClientTokenId

Hi, I'm running nestjs application inside of EKS pod, when I try to publish to a queue I get the error:

InvalidClientTokenId: The security token included in the request is invalid.

I publish using the SQS service:

import { SqsService } from '@ssut/nestjs-sqs';
import { Message } from '@ssut/nestjs-sqs/dist/sqs.types';

@Injectable()
export class TestService {
  constructor(readonly sqsService: SqsService) {}

  public publishToTestQueue(messages: Message<Test>) {
    return this.sqsService.send('test-q', messages);
  }
}

I initialise the service:

@Module({
  imports: [
    SqsModule.registerAsync({
      imports: [ConfigModule],
      useFactory: async (configService: ConfigService) => ({
        consumers: [],
        producers: [
          {
            name: 'test-q',
            queueUrl: configService.get('TEST_Q'),
            region: configService.get('AWS_REGION'),
          },
        ],
      }),
      inject: [ConfigService],
    }),
  ],
})

I initiate the AWS SDK without security keys, I have a role attached to the pod and I want the app to utilize the role:

AWS.config.update({ region: this.configService.get('AWS_REGION') });

What am I missing here?

Cannot pull QUEUE NAME from ENV in my consumer service file.

`import { Injectable } from '@nestjs/common';
import { SqsMessageHandler } from '@ssut/nestjs-sqs';
import * as AWS from 'aws-sdk';
import { ConfigService } from '@nestjs/config';

@Injectable()
export class AwsQueueConsumerService {
constructor(
private configService: ConfigService
) {}
@SqsMessageHandler(NEED_QUEUE_URL_FROM_ENV_HERE, false)
async handleMessage(message: AWS.SQS.Message) {
//Handle message here
}
}`

Can Someone please help me to resolve this issue? It's working fine when we hardcode the queue URL there. But its not possible to hardcode in a different environment.

Cannot find module '@smithy/shared-ini-file-loader'

"@aws-sdk/client-sqs": "3.427.0"
 "@ssut/nestjs-sqs": "2.2.0"

When moving from @aws-sdk/client-sqs version 3.414.0 to 3.427.0 I am getting an error like the following:

Cannot find module '@smithy/shared-ini-file-loader' from ...../.yarn/cache/@smithy-middleware-endpoint-npm-2.1.0-<hash>.zip/node_modules/@smithy/middleware-endpoint/dist-cjs/adaptors'

    Require stack:
      .yarn/cache/@smithy-middleware-endpoint-npm-2.1.0-<hash>.zip/node_modules/@smithy/middleware-endpoint/dist-cjs/adaptors/getEndpointUrlConfig.js
      .yarn/cache/@smithy-middleware-endpoint-npm-2.1.0-<hash>.zip/node_modules/@smithy/middleware-endpoint/dist-cjs/adaptors/getEndpointFromConfig.js
      .yarn/cache/@smithy-middleware-endpoint-npm-2.1.0-<hash>.zip/node_modules/@smithy/middleware-endpoint/dist-cjs/adaptors/getEndpointFromInstructions.js
      .yarn/cache/@smithy-middleware-endpoint-npm-2.1.0-<hash>.zip/node_modules/@smithy/middleware-endpoint/dist-cjs/adaptors/index.js
      .yarn/cache/@smithy-middleware-endpoint-npm-2.1.0-<hash>.zip/node_modules/@smithy/middleware-endpoint/dist-cjs/index.js
      .yarn/cache/@aws-sdk-client-sqs-npm-3.427.0-<hash>.zip/node_modules/@aws-sdk/client-sqs/dist-cjs/SQSClient.js
      .yarn/cache/@aws-sdk-client-sqs-npm-3.427.0-<hash>.zip/node_modules/@aws-sdk/client-sqs/dist-cjs/index.js
      .yarn/__virtual__/sqs-consumer-virtual/0/cache/sqs-consumer-npm-7.2.2-<hash>.zip/node_modules/sqs-consumer/dist/consumer.js
      .yarn/__virtual__/sqs-consumer-virtual/0/cache/sqs-consumer-npm-7.2.2-<hash>.zip/node_modules/sqs-consumer/dist/index.js
      .yarn/__virtual__/@ssut-nestjs-sqs-virtual/0/cache/@ssut-nestjs-sqs-npm-2.2.0-<hash>.zip/node_modules/@ssut/nestjs-sqs/dist/sqs.service.js
      .yarn/__virtual__/@ssut-nestjs-sqs-virtual/0/cache/@ssut-nestjs-sqs-npm-2.2.0-<hash>.zip/node_modules/@ssut/nestjs-sqs/dist/sqs.module.js
      .yarn/__virtual__/@ssut-nestjs-sqs-virtual/0/cache/@ssut-nestjs-sqs-npm-2.2.0-<hash>.zip/node_modules/@ssut/nestjs-sqs/dist/index.js
      src/some.controller.ts
      test/some-test.e2e-spec.ts

      2 | import { Injectable, Logger } from '@nestjs/common';
      3 | import { ConfigService } from '@nestjs/config';
    > 4 | import { SqsMessageHandler } from '@ssut/nestjs-sqs';

I resolved this by adding the @smithy/shared-ini-file-loader as a direct dependency.

Note, I am using Yarn 2 with PnP and Zero Installs so this may be related.

Support NestJS 10 Peer Dependencies

Hello!

There are some dependencies in the project that aren't updated yet for NestJS 10 support, in particular @golevelup/nestjs-discovery which could be updated to v4.0.0.

npm WARN Conflicting peer dependency: @nestjs/[email protected]
npm WARN node_modules/@nestjs/core
npm WARN   peer @nestjs/core@"^9.x" from @golevelup/[email protected]
npm WARN   node_modules/@ssut/nestjs-sqs/node_modules/@golevelup/nestjs-discovery
npm WARN     @golevelup/nestjs-discovery@"^3.0.0" from @ssut/[email protected]
npm WARN     node_modules/@ssut/nestjs-sqs

Stopping consumers

Hello,

I'm trying to figure out how to stop consumers manually. I want to handle the onApplicationShutdown NestJS lifecycle hook and, when it is called, stop the SQS consumer and check that there are no jobs running.

This is the SqsModuleOptionsFactory

@Injectable()
export default class SqsOptionsFactory implements SqsModuleOptionsFactory {
 createOptions(): SqsOptions | Promise<SqsOptions> {
    const consumers = []

    consumers.push({
      name: "the-queue-name",
      queueUrl: "the-queue-url",
      region: "the-queue-region",
    })

    return { consumers, producers: [] }
  }
}

Looking at bbc/sqs-consumer I can see the consumer.stop method, how can I retrieve the consumer to call the stop method?

DeadLock issue: Missing Timeout

if you're code doesn't resolve a promise correct (especially an issue when working with third-party libraries) -- there is no timeout to self correct dead lock of the processing loop

MessageAttributes of AWS.SQS.Message undefined

@SqsMessageHandler(/** name: / 'OPEN_TICKETS', /* batch: */ false)
public async openTickets(message: AWS.SQS.Message) {
const body = JSON.parse(message.Body);
console.log(body);
console.log(message.MessageAttributes);
}

Result :
{ test: 'test', test2: 2 }
undefined

Hi,

The message doesn't contains attributes. When I use the polling method from aws console, I see message attributes.

image

image

Thanks in advance,
Jonas

Possible code problem detected by linter

Hello, good morning.

I forked your repository for using at my company and I was requested to make some adaptations on it before using, and while on it the linter complained about one problem that actually seems important on the following lines of code:

https://github.com/ssut/nestjs-sqs/blob/master/lib/sqs.service.ts#L39-L53

     if (!metadata) {
        this.logger.warn(`No metadata found for: ${name}`);
      }

      const isBatchHandler = metadata.meta.batch === true;
      const consumer = Consumer.create({
        ...consumerOptions,
        ...(isBatchHandler
          ? {
              handleMessageBatch: metadata.discoveredMethod.handler.bind(
                metadata.discoveredMethod.parentClass.instance,
              ),
            }
          : { handleMessage: metadata.discoveredMethod.handler.bind(metadata.discoveredMethod.parentClass.instance) }),
      });

Here the code checks whether the metadata is present or not, but if it is undefined it will just continue running. And I don't think this is likely to work because we even make function calls on the subattributes like in:

metadata.discoveredMethod.handler.bind(
  metadata.discoveredMethod.parentClass.instance,
)

so even if I replaced all . with .? it would still break on that line.

I was wondering whether we might need to skip the iteration if we detect that the metadata is nil instead of just logging a warning.

Connection to SQS does not recover?

After an app loses connection with SQS, it's unable to recover it once the Internet connection is back.

I've noticed this when I left my app running on my laptop overnight (with wifi off) and opened it in the morning to find this:

Error   01/03/2021, 22:00:18 /Users/anton/projects/****/tail/node_modules/sqs-consumer/dist/consumer.js:45
    const sqsError = new errors_1.SQSError(message);
                     ^ - {"stack":[null]}
Error   01/03/2021, 22:00:18 SQSError: SQS receive message failed: Inaccessible host: `sqs.ap-southeast-2.amazonaws.com'. This service may not be available in the `ap-southeast-2' region.
    at toSQSError (/Users/anton/projects/****/tail/node_modules/sqs-consumer/dist/consumer.js:45:22)
    at Consumer.receiveMessage (/Users/anton/projects/****/tail/node_modules/sqs-consumer/dist/consumer.js:155:19)
    at runMicrotasks (<anonymous>)
    at processTicksAndRejections (node:internal/process/task_queues:94:5) - {"stack":[null]}

This is fine but when the wifi is back on, and I push events to the queue, the app doesn't seem to be consuming anything.

Example code documentation

It would be great if a working example for both consumers and producers were included either in the readme or in the project's repo.

Being new to NestJS, it is not intuitive to understand what to do from the current state of the documentation.

Thanks!

Microservice example

Hi!

Can someone provide an example of use in controller -> microservice?
Thanks!

bug: messages end up in dead queue

Hi there. thanks for your efforts.
I'm working on your code to create proper CustomTransport out of it
Then I realize my tests fail sometimes, randomly.
So I decided to run your tests to see if it fails with the same reason.
Technically your tests are fine but messages are pushed to dead queue the same way
so to reproduce it just run in code it two terminals in parallel

java -Dconfig.file=.github/build/elasticmq.conf -jar elasticmq-server-1.2.0.jar
npm run test:e2e

and the log would be

➜  nestjs-sqs git:(master) ✗ npm run test:e2e

> @ssut/[email protected] test:e2e
> npx jest --config=jest-e2e.config.js --detectOpenHandles --runInBand --forceExit

 PASS  e2e/module.e2e-spec.ts
  SqsModule
    registerAsync
      ✓ should register module async (23 ms)
    full flow
      ✓ should register message handler (1 ms)
      ✓ should register message producer (1 ms)
      ✓ should call message handler when a new message has come (19 ms)
      ✓ should call message handler multiple times when multiple messages have come (152 ms)
      ✓ should call the registered error handler when an error occurs (13 ms)
      ✓ should consume a dead letter from DLQ (1010 ms)

Test Suites: 1 passed, 1 total
Tests:       7 passed, 7 total
Snapshots:   0 total
Time:        2.545 s, estimated 3 s
Ran all test suites.
➜  nestjs-sqs git:(master) ✗ java -Dconfig.file=.github/build/elasticmq.conf -jar elasticmq-server-1.2.0.jar
17:15:08.487 [main] INFO  org.elasticmq.server.Main$ - Starting ElasticMQ server (1.2.0) ...
17:15:08.766 [elasticmq-akka.actor.default-dispatcher-4] INFO  akka.event.slf4j.Slf4jLogger - Slf4jLogger started
17:15:09.306 [elasticmq-akka.actor.default-dispatcher-5] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Started SQS rest server, bind address 0.0.0.0:9324, visible server address http://localhost:9324
17:15:09.366 [elasticmq-akka.actor.default-dispatcher-5] INFO  o.e.r.s.TheStatisticsRestServerBuilder - Started statistics rest server, bind address 0.0.0.0:9325
17:15:09.397 [elasticmq-akka.actor.default-dispatcher-5] INFO  o.elasticmq.actor.QueueManagerActor - Creating queue QueueData(test-dead.fifo,MillisVisibilityTimeout(30000),PT0S,PT0S,2021-09-05T17:15:09.378+08:00,2021-09-05T17:15:09.378+08:00,None,true,false,None,None,Map())
17:15:09.414 [elasticmq-akka.actor.default-dispatcher-5] INFO  o.elasticmq.actor.QueueManagerActor - Creating queue QueueData(test.fifo,MillisVisibilityTimeout(10000),PT0S,PT0S,2021-09-05T17:15:09.414+08:00,2021-09-05T17:15:09.414+08:00,Some(DeadLettersQueueData(test-dead.fifo,1)),true,false,None,None,Map())
17:15:09.415 [main] INFO  org.elasticmq.server.Main$ - === ElasticMQ server (1.2.0) started in 1164 ms ===
17:17:09.781 [elasticmq-akka.actor.default-dispatcher-18] INFO  org.elasticmq.actor.queue.QueueActor - test.fifo: Clearing queue
17:17:09.781 [elasticmq-akka.actor.default-dispatcher-19] INFO  org.elasticmq.actor.queue.QueueActor - test-dead.fifo: Clearing queue
17:17:10.089 [elasticmq-akka.actor.default-dispatcher-21] INFO  akka.actor.DeadLetterActorRef - Message [scala.runtime.BoxedUnit] from Actor[akka://elasticmq/user/$a/$a#-1093556993] to Actor[akka://elasticmq/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://elasticmq/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

this definitely should not happen because sqs-consumer have to remove messages.
not sure but it looks like it does not sometimes

@ssut/nestjs-sqs Compatibility Concern with NestJs V8, V9 & V10

SQS Consumer Compatibility Concern

I am trying to implement SQS consumer in a NestJs project using the @ssut/nestjs-sqs library (version 1.3.0 and 2.1.0). However, I've encountered a major concern with compatibility issues with different Nest.js versions.

Here are my observations:

Nest.js Version @ssut/nestjs-sqs Version Working
V8.0.0 V1.3.0 No
V8.0.0 V2.1.0 No
V9.0.0 V1.3.0 Yes
V9.0.0 V2.1.0 No
V10.0.0 V1.3.0 No
V10.0.0 V2.1.0 No

I am facing issues with @ssut/nestjs-sqs version 1.3.0 and 2.1.0 in Nest.js versions 8.0.0, 9.0.0, and 10.0.0.
I am getting different -different error for different versions for the same code

Note :- I have used the code in each version to test the compatibility

Nest.js Version @ssut/nestjs-sqs Version
V8.0.0 V1.3.0
/Users/avinash.anshu/Project/Practice/sqs-v8/node_modules/@ssut/nestjs-sqs/dist/sqs.service.js:56
            (_a = this.options.consumers) === null || _a === void 0 ? void 0 : _a.forEach((options) => {
                                                                                  ^
TypeError: Cannot read properties of undefined (reading 'meta')
    at /Users/avinash.anshu/Project/Practice/sqs-v8/node_modules/@ssut/nestjs-sqs/dist/sqs.service.js:65:49
    at Array.forEach (<anonymous>)
    at SqsService.<anonymous> (/Users/avinash.anshu/Project/Practice/sqs-v8/node_modules/@ssut/nestjs-sqs/dist/sqs.service.js:56:83)
    at Generator.next (<anonymous>)
    at fulfilled (/Users/avinash.anshu/Project/Practice/sqs-v8/node_modules/@ssut/nestjs-sqs/dist/sqs.service.js:17:58)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)
Nest.js Version @ssut/nestjs-sqs Version
V8.0.0 V2.1.0
/Users/avinash.anshu/Project/Practice/sqs-v8-latest/node_modules/@ssut/nestjs-sqs/dist/sqs.service.js:55
            (_b = this.options.consumers) === null || _b === void 0 ? void 0 : _b.forEach((options) => {
                                                                                  ^
TypeError: Cannot read properties of undefined (reading 'meta')
    at /Users/avinash.anshu/Project/Practice/sqs-v8-latest/node_modules/@ssut/nestjs-sqs/dist/sqs.service.js:64:49
    at Array.forEach (<anonymous>)
    at SqsService.<anonymous> (/Users/avinash.anshu/Project/Practice/sqs-v8-latest/node_modules/@ssut/nestjs-sqs/dist/sqs.service.js:55:83)
    at Generator.next (<anonymous>)
    at fulfilled (/Users/avinash.anshu/Project/Practice/sqs-v8-latest/node_modules/@ssut/nestjs-sqs/dist/sqs.service.js:17:58)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)
Nest.js Version @ssut/nestjs-sqs Version
V9.0.0 V2.1.0
(node:33736) NOTE: We are formalizing our plans to enter AWS SDK for JavaScript (v2) into maintenance mode in 2023.

Please migrate your code to use AWS SDK for JavaScript (v3).
For more information, check the migration guide at https://a.co/7PzMCcy
(Use `node --trace-warnings ...` to show where the warning was created)

/Users/avinash.anshu/Project/Practice/sqs-v9-latest/node_modules/sqs-consumer/dist/errors.js:40
    const sqsError = new SQSError(message);
                     ^
SQSError: SQS receive message failed: The security token included in the request is invalid.
    at toSQSError (/Users/avinash.anshu/Project/Practice/sqs-v9-latest/node_modules/sqs-consumer/dist/errors.js:40:22)
    at Consumer.receiveMessage (/Users/avinash.anshu/Project/Practice/sqs-v9-latest/node_modules/sqs-consumer/dist/consumer.js:173:43)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)
Nest.js Version @ssut/nestjs-sqs Version
V10.0.0 V1.3.0
[Nest] 33753  - 07/27/2023, 10:05:23 AM   ERROR [ExceptionHandler] Cannot read properties of undefined (reading 'values')
TypeError: Cannot read properties of undefined (reading 'values')
    at /Users/avinash.anshu/Project/Practice/sqs-v10/node_modules/@nestjs-plus/discovery/src/discovery.service.ts:68:50
    at arrayMap (/Users/avinash.anshu/Project/Practice/sqs-v10/node_modules/lodash/lodash.js:653:23)
    at map (/Users/avinash.anshu/Project/Practice/sqs-v10/node_modules/lodash/lodash.js:9622:14)
    at Function.flatMap (/Users/avinash.anshu/Project/Practice/sqs-v10/node_modules/lodash/lodash.js:9325:26)
    at new DiscoveryService (/Users/avinash.anshu/Project/Practice/sqs-v10/node_modules/@nestjs-plus/discovery/src/discovery.service.ts:67:7)
    at Injector.instantiateClass (/Users/avinash.anshu/Project/Practice/sqs-v10/node_modules/@nestjs/core/injector/injector.js:365:19)
    at callback (/Users/avinash.anshu/Project/Practice/sqs-v10/node_modules/@nestjs/core/injector/injector.js:65:45)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)
    at Injector.resolveConstructorParams (/Users/avinash.anshu/Project/Practice/sqs-v10/node_modules/@nestjs/core/injector/injector.js:144:24)
    at Injector.loadInstance (/Users/avinash.anshu/Project/Practice/sqs-v10/node_modules/@nestjs/core/injector/injector.js:70:13)
Nest.js Version @ssut/nestjs-sqs Version
V10.0.0 V2.1.0
(node:33776) NOTE: We are formalizing our plans to enter AWS SDK for JavaScript (v2) into maintenance mode in 2023.

Please migrate your code to use AWS SDK for JavaScript (v3).
For more information, check the migration guide at https://a.co/7PzMCcy
(Use `node --trace-warnings ...` to show where the warning was created)

/Users/avinash.anshu/Project/Practice/sqs-v10-latest/node_modules/sqs-consumer/dist/errors.js:40
    const sqsError = new SQSError(message);
                     ^
SQSError: SQS receive message failed: The security token included in the request is invalid.
    at toSQSError (/Users/avinash.anshu/Project/Practice/sqs-v10-latest/node_modules/sqs-consumer/dist/errors.js:40:22)
    at Consumer.receiveMessage (/Users/avinash.anshu/Project/Practice/sqs-v10-latest/node_modules/sqs-consumer/dist/consumer.js:173:43)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)

InvalidClientTokenId error after upgrading to 2.0.0 locally.

So basically that, everything worked fine in 1.2.x. I am using Localstack, which should not make any difference, and I'm loading the credentials and config from aws CLI files.

If I try to upgrade to 2.0.0 then I get this error in the method receiveMessage of the consumer.js file from sqs-consumer

InvalidClientTokenId: The security token included in the request is invalid.

If I go to consumer.js and try to log region and credentials I got his which looks as expected:

console.log({region: this.sqs.config.region(), credentials: await this.sqs.config.credentials()})


{
  region: Promise { 'eu-central-1' },
  credentials: { accessKeyId: 'dummy', secretAccessKey: 'dummy' }
}

Any idea on what might be happening? As I said this setup was working fine in 1.2.x and according to the docs no further configuration needs to be done if I'm not wrong.

Example code

This might not be the place for this, but I'm at a loss. I'm new to NestJS and using AWS SQS queues. I can consume messages, but I can't send them. I've got an AppService pretty much identical to the readme example, but it keeps failing:

  export class AppService {
  public constructor(private readonly sqsService: SqsService) {}

  getHello(): string {
    return 'Hello World YouTube';
  }

  public async sendMessage(message) {
    const id = String(Math.floor(Math.random() * 1000000));
    await this.sqsService.send(TestQueue.Test, {
      id,
      body: { test: true },
      groupId: 'test',
      deduplicationId: id,
      delaySeconds: 0,
    });
  }
}

When I run the app and try to send a message (through a POST call also handled by this app) I get these messages:

[Nest] 22049   - 05/27/2021, 6:27:22 PM   [NestFactory] Starting Nest application...
[Nest] 22049   - 05/27/2021, 6:27:22 PM   [InstanceLoader] HttpModule dependencies initialized +25ms
[Nest] 22049   - 05/27/2021, 6:27:22 PM   [InstanceLoader] DiscoveryModule dependencies initialized +0ms
[Nest] 22049   - 05/27/2021, 6:27:22 PM   [InstanceLoader] SqsModule dependencies initialized +0ms
[Nest] 22049   - 05/27/2021, 6:27:22 PM   [InstanceLoader] AppModule dependencies initialized +1ms
[Nest] 22049   - 05/27/2021, 6:27:22 PM   [RoutesResolver] AppController {}: +5ms
[Nest] 22049   - 05/27/2021, 6:27:22 PM   [RouterExplorer] Mapped {, GET} route +3ms
[Nest] 22049   - 05/27/2021, 6:27:22 PM   [RoutesResolver] SpaceController {/space}: +1ms
[Nest] 22049   - 05/27/2021, 6:27:22 PM   [RouterExplorer] Mapped {/space, GET} route +0ms
[Nest] 22049   - 05/27/2021, 6:27:22 PM   [RoutesResolver] MessagesController {/messages}: +1ms
[Nest] 22049   - 05/27/2021, 6:27:22 PM   [RouterExplorer] Mapped {/messages, GET} route +0ms
[Nest] 22049   - 05/27/2021, 6:27:22 PM   [RouterExplorer] Mapped {/messages, POST} route +1ms
[Nest] 22049   - 05/27/2021, 6:27:22 PM   [NestApplication] Nest application successfully started +35ms
[Nest] 22049   - 05/27/2021, 6:27:26 PM   [ExceptionsHandler] Failed to send messages: 742884 +4328ms
Error: Failed to send messages: 742884
    at Producer.sendBatch (/Users/dougfarrell/tmp/sqs-handler/node_modules/sqs-producer/dist/producer.js:56:15)
    at processTicksAndRejections (internal/process/task_queues.js:97:5)
    at async AppService.sendMessage (/Users/dougfarrell/tmp/sqs-handler/dist/app.service.js:25:9)
    at async /Users/dougfarrell/tmp/sqs-handler/node_modules/@nestjs/core/router/router-execution-context.js:46:28
    at async /Users/dougfarrell/tmp/sqs-handler/node_modules/@nestjs/core/router/router-proxy.js:9:17

Any help or suggestions would be greatly appreciated.

Thanks in advance,
Doug

Not able to connect to LocalStack Sqs URL

Hi Folks,

I have been struggling to figure out why Im not able to connect to LocalStack sqs URL. I have setup LocalStack to run the queue in my local. I was able to send and receive the message using cli commands and also I was able to publish the message to this queue from my other non nestjs application but the ssut/nestjs-sqs throws the below error and I'm not sure why.

api/node_modules/sqs-consumer/dist/errors.js:40
    const sqsError = new SQSError(message);
                     ^
SQSError: SQS receive message failed: The address https://sqs.us-east-1.amazonaws.com/ is not valid for this endpoint.
    at toSQSError (/api/node_modules/sqs-consumer/dist/errors.js:40:22)
    at Consumer.receiveMessage (/api/node_modules/sqs-consumer/dist/consumer.js:173:43)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)

Consumer is not working on AWS - NestJS

Hi. I just implemented the SQS queries. It calls the nestjs server, but this is not calling the handler correctly. It isn't logging any error messages or even entering in the handler. I verified that the handler is being called correctly, but still it doesn't do anything. This is only happning when using AWS, since, locally it works just fine.

2023-01-27T15:09:14.908Z	d4978ef1-adc9-5f25-91e1-d2bc4683894e	INFO	{
  Records: [
    {
      messageId: '52ef80c0-02e0-40f7-a361-4f1c51bf13bc',
      receiptHandle: 'AQEB4y3zSc9MmN/l5Mdpfas2yPTBbpl1KmfwckkP8enLrbumKnnMeFX4jE2TC6jvW0NGuQPzsfaDELa73UZkqJTPupHDT1vel8ipzYKVhdExXsPgjgG9T/4JGq2bogXxuM5CA2coGCCg0IqBA3y3pYcWbSzQJ9rnQPvcmLTlDVsZbvE96DzGEbnoYFhepiohUCJP/JKVrF+w1L07RZ7eMEn4OCAJtp2su84j0hoq6g9tC1b+I8vyzyXCVx7MfdNaL6qdiAldSkaIs1NYN33U62ZUMNJO9jJurs9XhP43QZ8ZCaKuGhBGbxAJFTCQC8IOQ7paltwHLB+H+ex/t+1Y3ygW6ZkBUcNZ+Etck+HQAmOsnjez65eYpCnnAqYQUFBRdT85U7tgFaX5LzXhLA3/Vsxy/Q==',
      body: '63d3e8d2f8d244ae1a9dec86',
      attributes: [Object],
      messageAttributes: [Object],
      md5OfMessageAttributes: 'a0c076cee46e48e232dc8c31996a2abe',
      md5OfBody: '4c4c406dc49e40e27c5b83ea0ef07fc2',
      eventSource: 'aws:sqs',
      eventSourceARN: 'arn:aws:sqs:us-east-1:???:SearchDriverBooking',
      awsRegion: 'us-east-1'
    }
  ]
}

QUESTION: FIFO queue message still available after consumer fetch

Hello, the first questiion why fifo queue still keep messages after consumer fetch it ?

@SqsMessageHandler(process.env.AWS_SQS_NAME, true)
  public async handleMessage(message: Message) {
    this.loggerService.verbose('New queue message');
    await this.appService.doSomething(message[0].Body);
  }

after finish handle it, the consumer fetch the same message again and again

Here is queue config

resource "aws_sqs_queue" "this" {
  name                      = var.queue_name
  delay_seconds             = 90
  max_message_size          = 2048
  message_retention_seconds = 1209600
  receive_wait_time_seconds = 10
  visibility_timeout_seconds = 0
  fifo_queue = true
}

if I change batch flag from true to false then consumer have a lot the same message without waiting handler finish.

// from
@SqsMessageHandler(process.env.AWS_SQS_NAME, true)

// to
@SqsMessageHandler(process.env.AWS_SQS_NAME, false)

What is wrong here?

my version
"@ssut/nestjs-sqs": "^2.0.0",

Thrown error inside batch message handler kills app.

Whenever an error is thrown inside a batch message handler and there is no error event handler method, an uncaught exception is thrown and kills the app.

Steps to reproduce:

  1. Set up handler like so:
@SqsMessageHandler('test', true)
public async handleMessage(messages: SQS.Message[]): Promise<void> {
  throw new Error('test')
}
  1. Do NOT include @SqsConsumerEventHandler('test', 'error').

It seems like when I include the error event handler, the error is able to be caught and logged. The app continues to run.

Expectation:

The error event handler should be an opt-in feature. The app should continue to function if individual handlers fail for whatever reason.

Results:

Throws uncaught exception

[1656014764276] ERROR: Error: Unexpected message handler failure: test
    at RentlyWebhookSqsHandler.handleMessage (C:\SourceControl\GITHUB\everest\backend\src\rently\sqs-handlers\rently-webhook-sqs-handler.service.ts:27:11)
    at Consumer.executeBatchHandler (C:\SourceControl\GITHUB\everest\backend\node_modules\@ssut\nestjs-sqs\node_modules\sqs-consumer\dist\consumer.js:303:24)
    at Consumer.processMessageBatch (C:\SourceControl\GITHUB\everest\backend\node_modules\@ssut\nestjs-sqs\node_modules\sqs-consumer\dist\consumer.js:267:24)
    at Consumer.handleSqsResponse (C:\SourceControl\GITHUB\everest\backend\node_modules\@ssut\nestjs-sqs\node_modules\sqs-consumer\dist\consumer.js:113:32)
    at processTicksAndRejections (node:internal/process/task_queues:96:5)

Also terminates the app.

How to implemente graceful shutdown

Hi,

I would like to know how to implement a graceful shutdown because I run a long process and in a deploy de SQS message is waiting as "in transit". The key is to implement a graceful shutdown with flying messages and move to available.

TypeError: Cannot read properties of undefined (reading 'values')

Since NestJS released their version 10, the nestjs-sqs lib doesn't work anymore, giving the following error message:

TypeError: Cannot read properties of undefined (reading 'values')

Regarding the stack trace, I can see that it comes from the @nestjs-plus/discovery lib used internally, which is a fork and hasn't been updated since 4 years...

You can find a reproductible repository example here, just start the app in development mode and you will see the error message.

Dependency injection?

import { Injectable } from "@nestjs/common";
import { SqsMessageHandler } from "@ssut/nestjs-sqs";
import { SyncsService } from "../syncs.service";

import { Jobtypes } from "./job.types";
export const SYNC_WORKSPACE_QUEUE_NAME = "syncs_workspace";

export const SYNCS_QUEUE_NAME = "syncs";

@Injectable()
export class SyncsConsumer {
  constructor(private readonly syncsService: SyncsService) {
    console.log(syncsService);
  }

  @SqsMessageHandler(SYNCS_QUEUE_NAME)
  public async processJob(message: AWS.SQS.Message) {
    const job = JSON.parse(message.Body);

    try {
      switch (job.type) {
        case Jobtypes.syncOne:
          console.log("received ", job);
          await this.syncsService.execute(job);

          break;

        default:
          throw `No job with type: ${job.type} exists in syncs queue`;
      }
    } catch (error) {
      console.log("error", error);
    }
  }
}

I've created this class as a consumer, and when receiving the message the syncService is not injected, is-it normal?

Support for Interceptor/Middleware

I want to add interceptor for all my SQS consumers to send errors to Sentry and add message metadata to my logging context. Is there a way for firing interceptors for message handlers?

NestJS 8?

Are there plans to update the package and support NestJS 8?

Empty event does not work

Hi,

I need to know when the queue is empty, I'm trying to do reading the docs at https://github.com/bbc/sqs-consumer#options:

  @SqsConsumerEventHandler(`${process.env.QUEUE_NAME}`, 'processing_error')
  public onProcessingError(error: Error, message: AWS.SQS.Message) {   
    console.error(`Error reading message`, JSON.stringify(error));
  }

  @SqsConsumerEventHandler(`${process.env.QUEUE_NAME}`, 'empty')
  public onEmpty() {   
    console.log(`The queue is empty!`);    
  }

The problem is that this log message is never printed to console. I've tried to test it adding messages to local queue and waiting for de empty event.

I use softwaremill/elasticmq docker image to run locally my queue.

I debug the library and I think the problem is in file sqs.services.js:

const eventMetadata = eventHandlers.find(({ meta }) => meta.name === name);

The find method get only first event, If I change order, empty event first in my class, the event empty is fired but the second not.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.