Git Product home page Git Product logo

Comments (12)

noxdafox avatar noxdafox commented on August 20, 2024

Hello,

can you please provide more information regarding the issue?

How are you creating the queue? How are you publishing the message? How are you consuming it?

from rabbitmq-message-deduplication.

marcin-stanosek avatar marcin-stanosek commented on August 20, 2024

Adding to the queue works. Blocking the addition to queue of the same message works. The cache size works correctly (taking it from the stack as it is full). The problem is how I remove messages from the queue (consume it) and then add the same message again. It don't add message to the queue (I don't see it in the rabbitmq monitoring plugin), maybe message is still in the cache?

Is it possible to inform "exchange" what happens to the message after getting to the "queue"? Information filter in "exchange" is done before the message goes to the queue, right?

Below is the code how I publish/consume:

// publish

$connection = new AMQPStreamConnection($host, $port, $user, $password);
$channel = $connection->channel();

$channel->queue_declare($queue, false, true, false, false);

$channel->exchange_declare($queue . '.exchange', 'x-message-deduplication', false, false, false, false, false, ['x-cache-size' => ['b', $cacheSize]]);
$channel->queue_bind($queue, $queue . '.exchange');

$channel->confirm_select();

$properties = [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'application_headers'=> [
        'x-deduplication-header' => ['S', $messageUID],
    ],
];
$channel->basic_publish(new AMQPMessage($message, $properties), $queue . '.exchange', $queue);

$channel->wait_for_pending_acks();

$channel->close();
$connection->close();
// consume

$connection = new AMQPStreamConnection($host, $port, $user, $password);
$channel = $connection->channel();

$channel->queue_declare($queue, false, true, false, false);

$channel->basic_qos(null, 1, null);

$channel->basic_consume($this->queue, '', false, false, false, false, array($consumer, 'consume'));
if ($channel->is_consuming()) {
   $channel->wait();
}

$channel->close();
$connection->close();

// $consumer object has method 'consumer' something like that
public function consume(AMQPMessage $msg)
{
  // process message
  $success = $this->process($msg); // private method

  if($success) {
    // confirm (discard message from queue)
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  } else {
    // reject (requeue message)
    $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
  }

  return $success;
}

from rabbitmq-message-deduplication.

noxdafox avatar noxdafox commented on August 20, 2024

You are deduplicating at the exchange without setting any TTL to the entries. Therefore, the behaviour you observe is the expected one.

Exchange level deduplication allows you to filter duplicates on a time base.

If you want to deduplicate based on the queue content, you need to use queue level deduplication. Check the README for a better description of the two.

from rabbitmq-message-deduplication.

marcin-stanosek avatar marcin-stanosek commented on August 20, 2024

My mistake.

I removed 'exchange_declare', 'queue_bind and add 'x-message-deduplication' => ['t', true] for 'queue_declare'.

Now it is ok if I publish message like this

$properties = [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'application_headers'=> [
        'x-deduplication-header' => ['S', $messageUID],
    ],
];
$channel->basic_publish(new AMQPMessage($message, $properties), $queue . '.exchange', $queue);

If I want to use publisher confirm

$channel->confirm_select();

$properties = [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'application_headers'=> [
        'x-deduplication-header' => ['S', $messageUID],
    ],
];
$channel->basic_publish(new AMQPMessage($message, $properties), $queue . '.exchange', $queue);

$channel->wait_for_pending_acks();

script never stops if I consume message (remove from queue) and publish again.

from rabbitmq-message-deduplication.

noxdafox avatar noxdafox commented on August 20, 2024

Can you please provide a full working example? You still need to declare an exchange and bind it to the queue.

from rabbitmq-message-deduplication.

marcin-stanosek avatar marcin-stanosek commented on August 20, 2024

publish.php

$queue = 'test';
$message = 'Lorem ipsum';
$messageUid = '1234';

$connection = new PhpAmqpLib\Connection\AMQPStreamConnection('10.0.2.2', '8000', 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare(
    $queue, // queue
    false, // passive
    true, // durable
    false, // exclusive
    false, // auto_delete
    false, // nowait
    ['x-message-deduplication' => ['t', true]] // arguments
);

$channel->exchange_declare(
    $queue . '.exchange', // exchange
    'x-message-deduplication', // type
    false, // passive
    false, // durable
    false, // auto_delete
    false, // internal
    false, // nowait
    ['x-cache-size' => ['b', 2]] // parameters
);

$channel->queue_bind($queue, $queue . '.exchange');

$channel->confirm_select();

$properties = [
    'delivery_mode' => PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'application_headers'=> [
        'x-deduplication-header' => ['S', $messageUid],
    ],
];
$channel->basic_publish(
    new PhpAmqpLib\Message\AMQPMessage($message, $properties), // message
    $queue . '.exchange', // exchange
    $queue // queue
);

$channel->wait_for_pending_acks();

$channel->close();
$connection->close();

consume.php

$queue = 'test';

$connection = new PhpAmqpLib\Connection\AMQPStreamConnection('10.0.2.2', '8000', 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare(
    $queue, // queue
    false, // passive
    true, // durable
    false, // exclusive
    false, // auto_delete
    false, // nowait
    ['x-message-deduplication' => ['t', true]] // arguments
);

$channel->exchange_declare(
    $queue . '.exchange', // exchange
    'x-message-deduplication', // type
    false, // passive
    false, // durable
    false, // auto_delete
    false, // internal
    false, // nowait
    ['x-cache-size' => ['b', 2]] // parameters
);

$channel->queue_bind($queue, $queue . '.exchange');

$channel->basic_qos(
    null, // prefetch_size
    1, // prefetch_count
    null // a_global
);

$channel->basic_consume(
    $queue, // queue
    '', // consumer_tag
    false, // no_local
    false, // no_ack
    false, // exclusive
    false, // nowait
    function ($msg) { // callback
        echo 'Received ', $msg->body, "\n";
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }
);

if ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

I assume the library php-amqplib is loaded.

Scenario:

  1. publish message
  2. see in rabbit monitor (exchange added, queue added, messaged published)
  3. consume message (receive "echo" result in cli)
  4. see in rabbit monitor (empty queue)
  5. publish message
  6. see in rabbit monitor (empty queue) :(

If I overfill the cache with other messages (different $messageUid), then I can add the original message.

from rabbitmq-message-deduplication.

noxdafox avatar noxdafox commented on August 20, 2024

You are now de-duplicating both at the exchange and at the queue. You need to use only queue level de-duplication if you want to ensure messages to be de-duplicated at the queue.

from rabbitmq-message-deduplication.

marcin-stanosek avatar marcin-stanosek commented on August 20, 2024

Earlier when I removed the "exchange" declaration and binds to queue, you said:

You still need to declare an exchange and bind it to the queue.

So, I added exchagne declartaion.

I feel that I mix two things: exchange and queue de-duplicating. Based on the code above, what is to remove?

from rabbitmq-message-deduplication.

noxdafox avatar noxdafox commented on August 20, 2024

Exchanges route messages to queue. Therefore, you need an exchange in order to publish your messages. There are different types of exchanges (topic, direct, fanout, ...). The plugin adds a deduplication exchange which allows to de-duplicate messages based on TTL.

Nevertheless, you want to de-duplicate messages at the queue. So you need to enable de-duplication on the queue, not at the exchange. Declare an exchange type which fits your needs (example: fanout), declare a queue with de-duplication enabled and bind them.

Do not use both a de-duplication exchange and a de-duplication queue as you will end up de-duplicating on both sides.

from rabbitmq-message-deduplication.

marcin-stanosek avatar marcin-stanosek commented on August 20, 2024

So I change an exchange type from 'x-message-deduplication' to 'fanout' (without arguments). I have the same behavior as the previous time, without an exchange routing.

It will work fine if I don't wait for confirmation of the publication. If I delete the lines: $channel->confirm_select(); and $channel->wait_for_pending_acks().

Scenario:

  1. Publish message. One item in queue. Okay.
  2. Publish the same message is blocked. One item in queue. Okay.
  3. Consume message (empty queue). Okay.
  4. Publish the same message:
    a) the script does not end (waiting for confirmation).
    b) if I remove confirm_select and wait_for_pending_acks then it is okey (message is published).

Is this normal behavior?

from rabbitmq-message-deduplication.

noxdafox avatar noxdafox commented on August 20, 2024

Sorry for the late reply.

According to your description, it is not normal behaviour. It might be related to issue #21.

As I am not familiar with PHP I need your help. Could you please provide a reproducible example? A set of scripts showing the above behaviour which I could run from my Linux machine.

You can attach it as a zip file to this issue.

from rabbitmq-message-deduplication.

noxdafox avatar noxdafox commented on August 20, 2024

Closing this issue due to inactivity, please re-open if still valid.

from rabbitmq-message-deduplication.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.