Git Product home page Git Product logo

Comments (3)

noxdafox avatar noxdafox commented on August 20, 2024

Hello,

there are few issues with your request which would bring more problems than benefits.

The amqp protocol provides two ways for consuming messages: direct or with consumer's acknowledgment. In the latter approach, the message is considered within the queue up until is not acknowledged. Therefore, allowing a second message while the first is being processed would basically violate the uniqueness protocol the plugin is providing.

The second issue is more practical and would be very hard to tackle. Let's say that while message C is being processed, we allow a second one in. What happens if the consumer processing the first C message crashes? The protocol states that in this case the message should be replaced in the queue as it has not been acknowledged. This means we would end up with 2 C messages in the queue. This would be fairly tricky to detect in a distributed system and could cause lots of headaches.

I would rather suggest a different approach. You could rely on publisher's confirmation in order to see at the publisher side if your message made it to the queue. If not, you publisher could try again few times (possibly using some exponential backoff mechanism). The assumption is that if the message is being processed, it will soon be acknowledged and gone from the queue. Unfortunately, for this approach you will need RabbitMQ 3.8.0 as previous versions do not support publisher confirmation for duplicate messages (see issue #21).

from rabbitmq-message-deduplication.

sam0delkin avatar sam0delkin commented on August 20, 2024

@noxdafox hey. I think I will try the approach with confirmation. But would be nice to have a config option to support this. So, in my example, I have a lot of statistics calculated via a queue for many DB rows. These calculations are not so easy and slow, so I'm using a queue for this operation. But the problem that calculations are triggering by any changes related to a row in DB. So, if a user made some changes during current calculations, I need to add a new message to a queue even if it will be already non-unique. But if other changes are made or consumer has somehow broken, I still expect that queue will not have > 2 instances of the same message. So, if you plan to support this and maybe other people like me need this, you can support this feature, but not initially and via setting some config value for the plugin. If not, you can close this issue :) Thanks in advance!

from rabbitmq-message-deduplication.

Ocramius avatar Ocramius commented on August 20, 2024
EDIT3: My mistake - I declared a de-duplication **exchange** instead of a **queue** I just got bitten by this behavior as well.

My use-case is very similar to @sam0delkin: I'm trying to use the plugin as a way to deduplicate/debounce (not specific term, but it gives the right idea) messages that would lead to the same result on the consumer side, but which are computational intensive.

Let's assume I have following messages (w/headers):

  • x-deduplication-header = A
  • x-deduplication-header = B
  • x-deduplication-header = C
  • x-deduplication-header = D
Given an empty queue attached to an x-message-deduplication exchange
When the input sequence "AAABBBCCCAAA" is pushed to the queue as different messages
Then the queue should contain the sequence "CBA"

This works as expected: great!

Now, let's assume the queue was filled and emptied:

Given an empty queue attached to an x-message-deduplication exchange
And the queue was filled with sequence "ABC"
And the queue was fully consumed
When the input sequence "ABCD" is pushed to the queue as different messages
Then the queue should contain the sequence "DCBA"

The above (last step) doesn't match expectations, as per discussion above: only "D" made it to the queue.

I did not fully understand #27 (comment), but I read through the code, and it seems like there is some message deletion during fetch:

def fetch(needs_ack, state = dqstate(queue: queue, queue_state: qs)) do
case passthrough2(state, do: fetch(needs_ack, qs)) do
{:empty, state} -> {:empty, state}
{{message, delivery, ack_tag}, state} ->
if duplicate?(queue) do
if needs_ack do
head = Common.message_header(message, "x-deduplication-header")
{{message, delivery, dqack(tag: ack_tag, header: head)}, state}
else
maybe_delete_cache_entry(queue, message)
{{message, delivery, ack_tag}, state}
end
else
{{message, delivery, ack_tag}, state}
end
end
end
# TODO: this is a bit of a hack.
# As the drop callback returns only the message id, we can't retrieve
# the message deduplication header. As a workaround `fetch` is used.
# This assumes the backing queue drop and fetch behaviours are the same.
# A better solution would be to store the message IDs in a dedicated index.
def drop(need_ack, state = dqstate(queue: queue, queue_state: qs)) do
if duplicate?(queue) do
case fetch(need_ack, state) do
{:empty, state} -> {:empty, state}
{{message = basic_message(id: id), _, ack_tag}, state} ->
maybe_delete_cache_entry(queue, message)
{{id, ack_tag}, state}
end
else
passthrough2(state, do: drop(need_ack, qs))
end
end

My elixir-fu is too low to understand fully what is going on: specifically what the conditional if needs_ack do means.

My hope is that when a consumer of the queue fetches items, those items also get deleted from the cache/map of this plugin, which doesn't seem to be the case right now. Am I misunderstanding the underlying design of the plugin, or is my AMQP consumer doing something wrong? As far as I know, it does ACK/NACK correctly...

EDIT: the ack() of my client is done in PHP, and seems to be relatively simple:

https://github.com/symfony/symfony/blob/af8ad344d38bdbf3ea563d33c6afd9950f8d6569/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php#L442-L445

    public function ack(\AMQPEnvelope $message, string $queueName): bool
    {
        return $this->queue($queueName)->ack($message->getDeliveryTag());
    }

The delivery tag seems to only be an integer-ish string here - not much else going on.

EDIT2: for more context on what I'm doing, here's how I configured my queues during my experiments.

        transports:
            product-queue:
                dsn: 'amqp://rabbitmq:5672/%2f/product'
                # Requires https://github.com/noxdafox/rabbitmq-message-deduplication/releases/tag/0.4.5 to function!
                # This setting allows us to add an "x-deduplication-header" to AMQP messages sent
                # to the product indexing queue, so that we can avoid repeated operations
                # if one is already scheduled in our message queue.
                #
                # For usage, see https://gist.github.com/noxdafox/ad1fb4c3769e06a888c3a542fc08c544
                options:
                    exchange:
                        name: 'deduplicate-product-indexing'
                        type: 'x-message-deduplication'
                        arguments:
                            # Size of de-duplication cache - 100M should be plenty
                            x-cache-size: 100000000

Still, getting bitten by other failures now, but I'll report a separate issue after investigation :D

from rabbitmq-message-deduplication.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.