Git Product home page Git Product logo

Comments (13)

noxdafox avatar noxdafox commented on August 20, 2024 3

Opened a pull request on rabbitmq-server.

from rabbitmq-message-deduplication.

noxdafox avatar noxdafox commented on August 20, 2024 2

The original fix was reverted most likely when merging branches v3.7.x and v3.8.x.

I added a test case to make sure the regression won't be introduced again. I am in the process of fixing other issues as well, this fix will be part of a release later on once I am done with other issues.

If urgent, you can build the plugin yourself following the README instructions.

from rabbitmq-message-deduplication.

noxdafox avatar noxdafox commented on August 20, 2024 1

Your assumptions are correct. AMQP 0-9-1 specifications do not include reasons for rejection in the basic.nack method. The basic.return method does but it's intended for messages which cannot be routed and should be emitted on the exchange side. The core issue is AMQP 0-9-1 completely de-couples publishers and queues through exchanges. If you want to de-duplicate on the queues, you need to deal with the fact a queue cannot communicate in any way to a message publisher.

RabbitMQ took the liberty of extending AMQP 0-9-1 in certain areas so, in theory, it might be possible to introduce a more detailed message rejection mechanism. Nevertheless, changing a protocol requires a lot of thinkering and discussions. I would recommend you to start the discussion on the mailing list and see if the idea resonates well within the community. If so, implementation becomes trivial.

A simple alternative I'd suggest is to adopt a limited exponential backoff mechanism on your publishers. As durability issues should be transient, you should be able to trust the fact that in few attempts your message would go through. If it doesn't go through in (for example) 3 minutes, then it means either the queue is full or the message is a duplicate. At this point is up to the client application to decide for the strategy.

You will need a bit of calibration as, in case of lots of duplicates, your publishers might end up throttling a lot. Moreover, the above strategy makes the assumption your broker is robust enough to quickly recover from issues. I believe nevertheless that durability issues are quite rare. The opposite indicates the broker might not be properly configured (limited resources, unstable network, etc...).

from rabbitmq-message-deduplication.

noxdafox avatar noxdafox commented on August 20, 2024

Thanks for reporting this. With the provided information I can reproduce it.

This is quite odd as I can clearly reproduce the issue using rabbitmqadmin but I can't do the same with client libraries.

For example, this is the behaviour I encounter using Python and pika.

import pika

DURABLE_DELIVERY = 2

parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

channel.queue_declare(queue='test-queue',
                      durable=True,
                      arguments={'x-message-deduplication': True})

for _ in range(100):
    channel.basic_publish(
        '',
        'test-queue',
        'Message Body',
        pika.BasicProperties(content_type='text/plain',
                             headers={'x-deduplication-header': 'test'},
                             delivery_mode=DURABLE_DELIVERY))

retval = channel.queue_declare(queue='test-queue',
                               durable=True,
                               passive=True,
                               arguments={'x-message-deduplication': True})

print('Messages in queue %d' % retval.method.message_count)

I publish 100 times the same message and the queue only contains one element. The script outputs:

$ python test.py
Messages in queue 1

As you can see, it works as expected.

I will try to produce a similar scenario using the MGMT console.

from rabbitmq-message-deduplication.

zamnuts avatar zamnuts commented on August 20, 2024

FYI, I was able to reproduce using the management UI. The publish HTTP request also hangs. I haven't tried a client library yet.

from rabbitmq-message-deduplication.

prkasat avatar prkasat commented on August 20, 2024

+1 I am able to reproduce it usingrabbitmqadmin, managementUI. I have also tested it with node amqplib for pub and pubWithConfirms.

  1. Pub without confirms:
const amqp = require('amqplib');
const q = 'dedup-durable';
const msg = 'hello world';

(async function pub() {
  try {
    const connection = await amqp.connect('amqp://guest:guest@localhost:5672');
    const channel = await connection.createChannel();

    await channel.assertQueue(q, { durable: true, arguments: {'x-message-deduplication': true}});

  // publish 10 times
    for (let i = 0; i < 10; i++) {
      await channel.sendToQueue(q, new Buffer(msg), { persistent: true, headers: {'x-deduplication-header': 'test'}});
      console.log('message number:%d sent', i); 
    }

    const cq = await channel.assertQueue(q, { durable: true, arguments: {'x-message-deduplication': true}});

    console.log('messages in q:', cq.messageCount); 
  } catch (err) {
    console.log('error: ', err);
  }
})();

I published the same message 10 times and queue contains only 1.

Output:

node pub.js
message number:0 sent
message number:1 sent
message number:2 sent
message number:3 sent
message number:4 sent
message number:5 sent
message number:6 sent
message number:7 sent
message number:8 sent
message number:9 sent
messages in q: 1
  1. Pub with confirms:
const amqp = require('amqplib');
const q = 'dedup-durable';
const msg = 'hello world';

(async function pubWithConfirms() {
  try {
    const connection = await amqp.connect('amqp://guest:guest@localhost:5672');
    const channel = await connection.createConfirmChannel();

    await channel.assertQueue(q, { durable: true, arguments: {'x-message-deduplication': true}});

    for (let i = 0; i < 10; i++) {
      const result = await new Promise((resolve, reject) => {
         channel.sendToQueue(q, new Buffer(msg), { persistent: true, headers: {'x-deduplication-header': 'testx'}}, err => {
           if (err) {
             reject(err);
           } else {
             resolve(true);
           }
         });
       });

      console.log('message number:%d sent', i, result);
    }

    const cq = await channel.assertQueue(q, { durable: true, arguments: {'x-message-deduplication': true}});

    console.log('messages in q:', cq.messageCount); 
  } catch (err) {
    console.log('error: ', err);
  }
})();

Output:

 node pub.js
 message number:0 sent true

only 1 message is published to queue as expected, however it neither rejects or confirms for the 9 other pubs, as I would expect for a pub with confirms. So, the process is just stuck there waiting for the response from rabbit to either confirm or reject the pub.

And it works perfectly fine, if I remove x-deduplication-header or just introduce a unique one everytime.

from rabbitmq-message-deduplication.

noxdafox avatar noxdafox commented on August 20, 2024

Thanks @zamnuts and @PratikKasat91 for testing this further.

The reason you see the publish command on rabbitmqadmin and the MGMT console hanging, is due to the fact that the MGMT plugin enables delivery confirmation on the server side. Hence the client/browser hangs in similar way as experienced by @PratikKasat91.

The issue with delivery confirmation and queue deduplication is the broker does not handle them together properly. Even if the queue behaviour defines a is_duplicate callback, the broker logic does not forward a notification back to the client when a duplicate is published into a queue.

I already wrote an experimental patch which seems to fix the issue but I need to polish it and open a Pull Request against rabbitmq-server. This means that in order to have queue deduplication and channel confirmation working you will have to wait for a new RabbitMQ release (assuming I manage to get the fix accepted by the community).

I tested the exchange deduplication with mandatory delivery and confirmation and works as expected.

from rabbitmq-message-deduplication.

noxdafox avatar noxdafox commented on August 20, 2024

Issue has been fixed upstream in rabbitmq/rabbitmq-common#283 and rabbitmq/rabbitmq-server#1774 and code has been updated in 3b85cf6 to reflect the changes.

As these fixes will be released in RabbitMQ 3.8, I won't make a release up until then. In case you want to try the fix yourself, you can clone the master branch and build the plugin following the instructions in the README.

I will close this ticket once the new release will be made.

from rabbitmq-message-deduplication.

zamnuts avatar zamnuts commented on August 20, 2024

Many thanks for getting this through. I'll watch for the 3.8 release.

Since this is AMQP 0.9.1, I presume the publisher will not be notified of a reason for rejection, which would be similar to the behavior of overflow queues with reject-publish.

We now have three scenarios, two of which are not catastrophic. I'm not sure how to deal with this. With a rejection on duplication without knowing the reason, we must assume the worst which is that the rejection came from a durability problem (or similar), and not that the message was a duplicate. At this point, we would retry the publish only to fail again. I would conclude that for a dedupe queue we would simply ignore the rejection, thereby sacrificing message durability. I can no longer see a reason to use publisher confirms in this case, except maybe to help throttle publishes on the individual clients.

Of course, if there's a reason, then this is a non-issue.

from rabbitmq-message-deduplication.

noxdafox avatar noxdafox commented on August 20, 2024

The new release of the plugin 0.4.3 adds support for RMQ v3.8 which should fix this issue.

Please feel free to re-open it if you still have issues with publish confirmation.

from rabbitmq-message-deduplication.

mike-cochrane avatar mike-cochrane commented on August 20, 2024

I still see this issue when using the MassTransit library (C#) and RabbitMq. If I send a duplicate message with the default PublisherConfirmation setting of true then the send operation hangs and doesn't complete. However it works as expected if PublisherConfirmation = false.

This is when deduplication is at the queue level.

This is using RabbitMQ 3.8.8 and Erlang 23.0.4, along with the 0.4.5 release of the plugin so I should be right up to date.

from rabbitmq-message-deduplication.

flokasper avatar flokasper commented on August 20, 2024

As discuss in the aio-pika issue:

My Dockerfile:

FROM rabbitmq:3.8.5-management

RUN apt-get update && apt-get install -y wget

RUN wget -P /opt/rabbitmq/plugins/ https://github.com/noxdafox/rabbitmq-message-deduplication/releases/download/0.4.5/elixir-1.8.2.ez

RUN wget -P /opt/rabbitmq/plugins/ https://github.com/noxdafox/rabbitmq-message-deduplication/releases/download/0.4.5/rabbitmq_message_deduplication-v3.8.x_0.4.5.ez

RUN rabbitmq-plugins enable --offline rabbitmq_management rabbitmq_message_deduplication

Test environment:

python = "^3.7"
aio-pika = "^6.7.1"
pika = "==1.1.0"
pytest = "^5.4.3"
pytest-asyncio = "^0.14.0"

In this test, test_pika works, test_aio_pika hang:

import json
import pytest
import asyncio
import logging
from typing import Optional, Dict

import aio_pika
from pika import URLParameters, BasicProperties
from pika.adapters.blocking_connection import BlockingConnection


RABBITMQ_URL = "amqp://guest:guest@localhost:5672/"


def test_pika():
    queue_name = "test-pika"
    connection = BlockingConnection(URLParameters(RABBITMQ_URL))
    channel = connection.channel()
    channel.basic_qos(prefetch_count=1)

    arguments = {"x-message-deduplication": True}
    channel.queue_declare(
        queue=queue_name, durable=True, arguments=arguments
    )

    obj = {"test": "OK"}
    obj_json = json.dumps(obj, sort_keys=True, separators=(",", ":"))

    headers = {"x-deduplication-header": obj_json}  # type: Optional[Dict]

    for _ in range(0, 2):
        channel.basic_publish(
            exchange="",
            routing_key=queue_name,
            body=obj_json.encode(),
            properties=BasicProperties(delivery_mode=2, headers=headers),
            mandatory=True,
        )

    nb_receive = 0
    with pytest.raises(TimeoutError):
        for method_frame, properties, body in channel.consume(
                queue_name, auto_ack=False, inactivity_timeout=10
        ):
            if body is None:
                raise TimeoutError()
            else:
                channel.basic_ack(
                    delivery_tag=method_frame.delivery_tag
                )
                nb_receive += 1
    assert nb_receive == 1

    channel.queue_delete(queue_name)
    channel.close()
    connection.close()
    
    
@pytest.mark.asyncio
async def test_aio_pika():
    queue_name = "test-aio-pika"

    connection = await aio_pika.connect(RABBITMQ_URL, timeout=10)

    # Change to False and it works
    channel = await connection.channel(publisher_confirms=True)

    await channel.set_qos(prefetch_count=1)

    arguments = {"x-message-deduplication": True}
    queue = await channel.declare_queue(
        name=queue_name, durable=True, arguments=arguments
    )

    obj = {"test2": "OK"}
    obj_json = json.dumps(obj, sort_keys=True, separators=(",", ":"))

    headers = {"x-deduplication-header": obj_json}  # type: Optional[Dict]
    for i in range(0, 2):
        logging.info(f"publishing:{i}")
        await channel.default_exchange.publish(
            message=aio_pika.Message(
                body=obj_json.encode(),
                delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
                headers=headers,
            ),
            routing_key=queue_name,
            mandatory=True,
        )
        logging.info(f"published:{i}")

    nb_receive = 0
    with pytest.raises(asyncio.TimeoutError):
        async with queue.iterator(timeout=10) as queue_iter:
            async for message in queue_iter:
                await message.ack()
                nb_receive += 1

    assert nb_receive == 1

    await queue.delete()
    await channel.close()
    await connection.close()

from rabbitmq-message-deduplication.

noxdafox avatar noxdafox commented on August 20, 2024

Fix released in v0.5.0. Please re-open this ticket if the issue persists.

from rabbitmq-message-deduplication.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.