Git Product home page Git Product logo

pubsub's Introduction

PubSub

This is a generic PubSub Factory exposing a listen and a emit method.

NOTE: Today, only Google Cloud PubSub has been added.

Installation

npm install --save @algoan/pubsub

Usage

Google Cloud PubSub

Run tests

To run tests or to try the PubSubFactory class, you need to have a google account and have installed gcloud sdk.

Then, to install the Google PubSub simulator, run:

gcloud components install pubsub-emulator
gcloud components install beta
gcloud components update

Start tests running:

npm test

It will launch a Google PubSub emulator thanks to the google-pubsub-emulator library.

Example

To create a PubSub instance using Google Cloud:

import { EmittedMessage, GCPubSub, PubSubFactory, Transport } from '@algoan/pubsub'

const pubsub: GCPubSub = PubSubFactory.create({
  transport: Transport.GOOGLE_PUBSUB,
  options: {
    projectId: 'test',
    // And all other Google PubSub properties
  }
});
const topicName: string = 'some_topic';

await pubsub.listen(topicName, {
  autoAck: true,
  onMessage: (data: EmittedMessage<{foo: string}>) => {
    console.log(data.parsedData); // { foo: 'bar', time: {Date.now}, _eventName: 'some_topic' }
    // do whatever you want. The message has already been acknowledged
  },
  onError: (error: Error) => {
    // Handle error as you wish
  }
});

await pubsub.emit(topicName, { foo: 'bar' });

Contribution

Thank you for your future contribution 😁 Please follow these instructions before opening a pull request!

API

PubSubFactory.create({ transport, options })

The only static method from the PubSubFactory class. It initiates a new PubSub instance depending on the transport. By default, it connects to Google Cloud PubSub.

  • transport: PubSub technology to use. Only GOOGLE_PUBSUB is available for now.
  • options: Options related to the transport.
    • If transport === Transport.GOOGLE_PUBSUB, then have a look at the Google Cloud PubSub config client.
    • debug: Display logs if it is set to true. It uses a pino logger and pino-pretty if NODE_ENV is not equal to production.
    • pinoOptions: If debug is set to true, set the pino logger options. Default to level: debug and prettyPrint: true if NODE_ENV is not equal to production.
    • topicsPrefix: Add a prefix to all created topics. Example: topicsPrefix: 'my-topic', all topics will begin with my-topic+{your topic name}.
    • topicsSeparator: Customize separator between topicsPrefix and topic name. Example: topicsSeparator: '-', all topics will be {topic prefix}-{your topic name} (default to '+').
    • subscriptionsPrefix: Add a prefix to all created subscriptions. Example: subscriptionsPrefix: 'my-sub', all subscriptions will begin with my-sub%{your topic name}.
    • subscriptionsSeparator: Customize separator between subscriptionsPrefix and topic name. Example: subscriptionsSeparator: '-', all subscriptions will be {subscription prefix}-{your topic name} (default to '%').
    • namespace: Add a namespace property to Message attributes when publishing on a topic.
    • environment: Add a environment property to Message attributes when publishing on a topic.

pubsub.listen(event, opts)

Listen to a specific event.

NOTE: It only uses the Google Cloud subscription pull delivery for now.

  • event: Name of the event.
  • opts: Options related to the Listener method
    • onMessage: Method called when receiving a message
    • onError: Method called when an error occurs
    • options: Option related to the chosen transport

If the chosen transport is Google Cloud PubSub, then options would be:

  • autoAck: Automatically ACK an event as soon as it is received (default to true)
  • subscriptionOptions: Options related to the created Subscription:
    • name: Custom name for the subscription. Default: event (also equal to the topic name)
    • get: Options applied to the getSubscription method (have a look at Subscription options)
    • sub: Options applied to the subscription instance (see also setOptions method)
    • create: Options applied to the createSubscription method (have a look at Create Subscription options)
  • topicOptions: Options applied to the created topic (have a look at Topic options)
  • topicName: Set the topic name. By default, it uses the default name with a prefix.

pubsub.emit(event, payload, opts)

Emit a specific event with a payload. It added attributes in the message if you have added a namespace or an environment when setting the PubSubFactory class. It also adds an _eventName and a time property in the emitted payload.

  • event: Name of the event to emit.
  • payload: Payload to send. It will be buffered by Google, and then parsed by the listen method.
  • opts: Options related to the Emit method
    • metadata: Custom metadata added to the message
    • options: Option related to the chosen transport

If the chosen transport is Google Cloud PubSub, then options would be:

  • topicOptions: Options applied to the created topic (have a look at Topic options)
  • publishOptions: Publish options set to the topic after its creation. Refer to Publish Options
  • messageOptions: Additional message options added to the message. Refer to Message Options

pubsub.unsubscribe(event)

Stop the server connection for a given subscription.

  • event: Name of of the event to stop listening for.

pubsub's People

Contributors

afzaalahmad avatar bendakh avatar ccoeurderoy avatar dependabot-preview[bot] avatar dependabot[bot] avatar g-ongenae avatar gitsambhal avatar leker29 avatar meriambensassi avatar na-ji avatar qhdinh avatar semantic-release-bot avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

pubsub's Issues

event name used to create topic and subscription

Hello,
I noticed that the following method uses the same event string to get or create both the topic and the subscription. For this is impossible to create a topic with a different name than the subscription. Does it really make sense to use the same string for both? Shouldnt we be able to have different names for subscription and topic?

public async listen<T>(
event: string,
opts: ListenOptions<T, GCListenOptions> = { options: { autoAck: true } },
): Promise<void> {
const topic: Topic = await this.getOrCreateTopic(event, opts.options?.topicOptions);
const subscription: Subscription = await this.getOrCreateSubscription(
event,
topic,
opts.options?.subscriptionOptions,
);
this.logger.debug(`Listened to topic ${topic.name} with subscription ${subscription.name}`);

The automated release is failing 🚨

🚨 The automated release from the master branch failed. 🚨

I recommend you give this issue a high priority, so other packages depending on you could benefit from your bug fixes and new features.

You can find below the list of errors reported by semantic-release. Each one of them has to be resolved in order to automatically publish your package. I’m sure you can resolve this 💪.

Errors are usually caused by a misconfiguration or an authentication problem. With each error reported below you will find explanation and guidance to help you to resolve it.

Once all the errors are resolved, semantic-release will release your package the next time you push a commit to the master branch. You can also manually restart the failed CI job that runs semantic-release.

If you are not sure how to resolve this, here is some links that can help you:

If those don’t help, or if this issue is reporting something you think isn’t right, you can always ask the humans behind semantic-release.


Invalid npm token.

The npm token configured in the NPM_TOKEN environment variable must be a valid token allowing to publish to the registry https://registry.npmjs.org/.

If you are using Two-Factor Authentication, make configure the auth-only level is supported. semantic-release cannot publish with the default auth-and-writes level.

Please make sure to set the NPM_TOKEN environment variable in your CI with the exact value of the npm token.


Good luck with your project ✨

Your semantic-release bot 📦🚀

autoAck: false not supported

Hi,

thanks for this awesome package!

I want to set autoAck to false to ensure successful processing but then I cannot ack the message anymore since the EmittedMessages does not holds the original .ack() method anymore.

Best
Philipp

Tests are failing locally

npm test

As you can see, 3 tests failed and I had to kill the process after about two minutes.

Capture d’écran 2021-03-10 à 11 27 47

Tests are working in the CI environment, but it does not work locally...

Originally posted by @g-ongenae in #133 (comment)

A way to pass subscription options/metadata

Greetings!

I've been playing around with your NestJS implementation with pubsub and it works really well. I'm looking for a way to set subscription options, such as ackDeadlineSeconds and deadLetterPolicy. Is there a way to do it with the current implementation? I'm having some trouble finding how to achieve this.
edit: I just digged through the code and it seems the subscriptions are made with autoCreate, I believe you can't pass these options in this case :( anyway I'd like to ask if you have any idea how I can make a workaround, appreciate it!

Upgrade to @google-cloud/pubsub 3.5.1

There is a possible fix for this error (see here):

2023-04-24 07:04:25 | Node.js v19.8.1
2023-04-24 07:04:25 | }
2023-04-24 07:04:25 | errorCode: 'INVALID'
2023-04-24 07:04:25 | at async ModAckQueue.flush (/usr/src/app/silver/node_modules/@google-cloud/pubsub/build/src/message-queues.js:195:29) {
2023-04-24 07:04:25 | at async ModAckQueue._sendBatch (/usr/src/app/silver/node_modules/@google-cloud/pubsub/build/src/message-queues.js:476:31)
2023-04-24 07:04:25 | at async Promise.all (index 0)
2023-04-24 07:04:25 | at /usr/src/app/silver/node_modules/@google-cloud/pubsub/build/src/message-queues.js:470:43
2023-04-24 07:04:25 | at ModAckQueue.handleAckFailures (/usr/src/app/silver/node_modules/@google-cloud/pubsub/build/src/message-queues.js:328:18)
2023-04-24 07:04:25 | at Array.forEach (<anonymous>)
2023-04-24 07:04:25 | at /usr/src/app/silver/node_modules/@google-cloud/pubsub/build/src/message-queues.js:330:29
2023-04-24 07:04:25 | AckError: INVALID : 3 INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed.
2023-04-24 07:04:25 |  
2023-04-24 07:04:25 | ^
2023-04-24 07:04:25 | const exc = new subscriber_1.AckError(e[0], rpcError.message);
2023-04-24 07:04:25 | /usr/src/app/silver/node_modules/@google-cloud/pubsub/build/src/message-queues.js:330

How to set orderingKey in Nestjs ?

I want to receive message from subscription one at a time (example: next msg should come only after previous one is acked/expired for same orderKey ) based on same orderingKeys and concurrently with different.

Google never acknowledge that the event has been received and send it again and again.

Hi, we found out a small bug. We have a NestJs micro-services with a couple of @EventPattern('my_event').

Until now it was working perfectly, we do a data.ack() at the end of the function. We also setup the dealLetter queue that after 5 not ack we send it to the deadLetter.

The thing is that it was working until we had 19 @EventPattern('my_event').
When we included more (20 and beyond), it seems like google pub/sub doesn't even acknowledge that the events have been received (all the events were affected).

We tried to console log the getOriginalMessage().deliveryAttempt and it doesn't increase even though event id is the same.
We still get the event, we still can ack it, there is no error... but pub/sub still resend the event again and again on the server restart.

And if we keep the number of @EventPattern('my_event') below 20 then it works fine again.

Do you know why? if there is a solution to be able to go over this limit ?

Thanks

how to close the connection to stop listening for new messages?

Hi guys,

I am using pubsub.client.close() method to close the connection to the pubsub but the subscription(created using pubsub.listen) is not closed and the messages are received even after closing it.
What is the proper way to close the listening on subsription?

It is blocking the event loop

I am using Google Pubsub in NestJs and when the data is recieved by the handler, the event loop gets blocked and I am unable to make any HTTP request to the API.
I am using Blocked At plugin to get the following stacktrace.

Blocked for 20320.787875ms, operation started here: [
  '    at AsyncHook.init (/Users/username/myproject/node_modules/.pnpm/[email protected]/node_modules/blocked-at/index.js:31:11)',
  '    at emitInitNative (node:internal/async_hooks:202:43)',
  '    at emitInitScript (node:internal/async_hooks:505:3)',
  '    at promiseInitHook (node:internal/async_hooks:324:3)',
  '    at promiseInitHookWithDestroyTracking (node:internal/async_hooks:328:3)',
  '    at /Users/username/myproject/node_modules/.pnpm/@[email protected]_3xexfhuxzzpm44flufjnppzrlu/node_modules/@nestjs/microservices/context/rpc-context-creator.js:43:23',
  '    at InterceptorsConsumer.intercept (/Users/username/myproject/node_modules/.pnpm/@[email protected]_kj7rymkcr5lkprk6tswpsa5dxa/node_modules/@nestjs/core/interceptors/interceptors-consumer.js:11:20)',
  '    at /Users/username/myproject/node_modules/.pnpm/@[email protected]_3xexfhuxzzpm44flufjnppzrlu/node_modules/@nestjs/microservices/context/rpc-context-creator.js:51:46',
  '    at /Users/username/myproject/node_modules/.pnpm/@[email protected]_3xexfhuxzzpm44flufjnppzrlu/node_modules/@nestjs/microservices/context/rpc-proxy.js:11:38',
  '    at eventHandler (/Users/username/myproject/node_modules/.pnpm/@[email protected]_3xexfhuxzzpm44flufjnppzrlu/node_modules/@nestjs/microservices/listeners-controller.js:55:53)',
  '    at Object.onMessage (/Users/username/myproject/node_modules/.pnpm/@[email protected]_o2upt3vfiv5amp3ncoauuaj7la/node_modules/@algoan/nestjs-google-pubsub-microservice/src/GooglePubSubServer.ts:110:13)',
  '    at Subscription.<anonymous> (/Users/username/myproject/node_modules/.pnpm/@[email protected]/node_modules/@algoan/pubsub/dist/src/GoogleCloudPubSub/GoogleCloudPubSub.js:33:22)',
  '    at Subscription.emit (node:events:513:28)',
  '    at Subscriber.<anonymous> (/Users/username/myproject/node_modules/.pnpm/@[email protected]/node_modules/@google-cloud/pubsub/src/subscription.ts:329:38)',
  '    at Subscriber.emit (node:events:513:28)',
  '    at /Users/username/myproject/node_modules/.pnpm/@[email protected]/node_modules/@google-cloud/pubsub/src/lease-manager.ts:243:47',
  '    at processTicksAndRejections (node:internal/process/task_queues:77:11)'
]

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.