Git Product home page Git Product logo

Comments (17)

skif48 avatar skif48 commented on May 27, 2024 1

@marcio199226 fixed in v 0.1.2. You can go ahead and npm install it. New version introduces a slightly different usage of custom setup function, namely - return object interface. The details can be found under the same section with function usage in readme. Let me know if anything goes wrong. 🙂

from nabbitmq.

skif48 avatar skif48 commented on May 27, 2024 1

Hey @marcio199226. Is it still an issue for you? If not, I'll close this one. 🙂

from nabbitmq.

skif48 avatar skif48 commented on May 27, 2024

Hello @marcio199226, thanks for your attention to the project!

Publisher class is abstracted away from queue creation, this class only works with exchanges, because that's the only thing RabbitMQ publishers should really work with - it's rarely a good practice to work directly with queues themselves.

What is your use case? Optional queue assertion and binding during publisher creation might be a nice feature request.

from nabbitmq.

skif48 avatar skif48 commented on May 27, 2024

About publisher confirms you can read here, but to keep it short, it's a way for publisher to identify the fact that RabbitMQ did receive a message we just published.

If publisher confirmation fails for whatever reason, an instance of PublisherConfirmationError is thrown to the publisher actions stream. You can use instanceof to catch it.

from nabbitmq.

marcio199226 avatar marcio199226 commented on May 27, 2024

Ok thanks for explanation

i have no special use case for not using an exchange.

I have another question when i start to consume messages from a queue new exchange was created an example:

async function main() {
  const connectionFactory = new RabbitMqConnectionFactory();
  connectionFactory.setUri('amqp://localhost:5672');
  const connection = await connectionFactory.newConnection();
  const consumerFactory = new ConsumerFactory(connection);
  consumerFactory.setConfigs({
    noDeadLetterQueue: true,
    autoAck: false,
    prefetch: 1,
    queue: {
      name: 'accounts_input_google',
      durable: true,
    }
  });
  const consumer = await consumerFactory.newConsumer();

  consumer.startConsuming().subscribe({
    next: async (message) => {
      const payload = JSON.parse(message.content);
     // other stuff
    },
    complete: () => console.log('completed'),
    error: (error) => {
      console.error(error);
    }
  });
}

After a message was consumed i see exchange_accounts_input_google.

How can i prevent from creating new exchange?

from nabbitmq.

skif48 avatar skif48 commented on May 27, 2024

Exchange is created by default and bound to your queue. If for some reason, you don't need an exchange, you can leverage a setup function. You can find an example here under With custom setup function section.

Not using an exchange is not a usual use case for RabbitMQ setup, therefore an exchange is created by default when you instantiate a Consumer instance. This exchange is later needed when you're going to publish messages.

from nabbitmq.

marcio199226 avatar marcio199226 commented on May 27, 2024

I will explain better you my rabbitmq setup.

I would have 1 exchange of direct type which has bindings based on routing keys which will routing those messages to queue with corresponding routing key.

The "issue" is when i have a consumer for queue "bar" it automatically created an exchange bar_exchange but in my case already exists an exchange so i wouldn't to create a new one for each queue the consumer is subscribed to.

Thanks for all replies i will look for custom setup function anyway or rewrite my consumers using raw ampqlib

from nabbitmq.

skif48 avatar skif48 commented on May 27, 2024

@marcio199226 I see. Well, you can provide exchange info within consumer configs. Consumer class uses assertions, these operations are idempotent, meaning, they will either create a required resource or ensure its existence. Therefore if you provide necessary info about your exchanges and bindings to your consumer configs, not only it will work for you, but also you'll be sure that your setup is alright, before you start consuming.

from nabbitmq.

skif48 avatar skif48 commented on May 27, 2024

@marcio199226 If you decide to use consumer configs for your case, please let me know if they don't fit your needs 🙂

from nabbitmq.

marcio199226 avatar marcio199226 commented on May 27, 2024

Hi, i have used custom setup function but it will throw an error:

const rabbit                    = require('nabbitmq');
const RabbitMqConnectionFactory = rabbit.RabbitMqConnectionFactory;
const ConsumerFactory           = rabbit.ConsumerFactory;

const connectionFactory = new RabbitMqConnectionFactory();
connectionFactory.setUri('amqp://localhost:5672');
const connection = await connectionFactory.newConnection();
const consumerFactory = new ConsumerFactory(connection);
consumerFactory.setCustomSetupFunction(async (connection) => {
const channel = await connection.createChannel();
await channel.assertExchange(`dynamic_exchange_name_${id}`, 'direct', { durable: true });
const queueMetadata = await channel.assertQueue(`dynamic_queue_name_${id}`, {
durable: true,
});

await channel.bindQueue(queueMetadata.queue, `dynamic_exchange_name_${id}`, `my.routing.key.${id}`);
await channel.prefetch(1);

return {channel, prefetch: 1, autoAck: false};
});
const consumer = await consumerFactory.newConsumer();

consumer.startConsuming().subscribe({
next: async (message) => {
// code
},
complete: () => console.log('completed'),
error: (error) => {
console.error(error);
}
});

After executing i have the following error:
node consumer.js (node:24241) UnhandledPromiseRejectionWarning: Error: Cannot read property 'queue' of null at ConsumerFactory.<anonymous> (/home/oskar/projects/instagram/node_modules/nabbitmq/lib/factories/consumer-factory.js:35:23) at Generator.throw (<anonymous>) at rejected (/home/oskar/projects/node_modules/nabbitmq/lib/factories/consumer-factory.js:5:65) at <anonymous> at process._tickCallback (internal/process/next_tick.js:189:7) (node:24241) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1) (node:24241) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

But the exchange is created successfully with right bindings and the queue is created too.

What i'm doing wrong ?

P.S i have the latest version installed 0.1.1

P.S2 seems that while constructing publisher/consumer factory some configs seems are still needed by them but configs & rawConfigs are sets to null, you should set them at least to an empty object i think beacuse of this for example: https://github.com/skif48/nabbitmq/blob/develop/src/models/publisher.ts#L164 when config = null https://github.com/skif48/nabbitmq/blob/develop/src/models/publisher.ts#L74

from nabbitmq.

skif48 avatar skif48 commented on May 27, 2024

Hello again. Working on it @marcio199226, thanks for the snippet.

from nabbitmq.

marcio199226 avatar marcio199226 commented on May 27, 2024

I noticed that when consuming messages if I try to get message's routing key through message.fields.routingKey I'm obtaining queue name instead of message's routing key

from nabbitmq.

skif48 avatar skif48 commented on May 27, 2024

Hello @marcio199226!
Did you use custom setup function or you used consumer configs to set up consuming logic?

P.S. I just released version 1.0.0 which changed consumer and publisher configs a little. You can have a look here and here. Also there are examples updated in README.

from nabbitmq.

marcio199226 avatar marcio199226 commented on May 27, 2024

I have used a custom setup function

  consumerFactory.setCustomSetupFunction(async (connection) => {
    const channel = await connection.createChannel();
    await channel.assertExchange('exchange', 'topic', { durable: true });
    const queueMetadata = await channel.assertQueue('input', {
      durable: true,
      maxPriority: 10
    });

    await channel.bindQueue(queueMetadata.queue, 'exchange', 'account.*');
    await channel.prefetch(1);

    return {channel, prefetch: 1, autoAck: false, queue: 'input'};
  });

And after consuming some message, message.fields.routingKey returns my queue name "input" insteaf of currently messgae routingKey

I have used 0.1.2v

Best regards

from nabbitmq.

skif48 avatar skif48 commented on May 27, 2024

@marcio199226 thanks for the snippet. Working on that.

from nabbitmq.

skif48 avatar skif48 commented on May 27, 2024

@marcio199226 I think I know where the bug is, will publish a fix soon. Thank you very much!

from nabbitmq.

skif48 avatar skif48 commented on May 27, 2024

I'll close this issue and move this conversation in another one: #7 .

from nabbitmq.

Related Issues (3)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.