Git Product home page Git Product logo

rabbitmq-message-deduplication's Introduction

RabbitMQ Message Deduplication Plugin

Build Status

A plugin for filtering duplicate messages.

Messages can be deduplicated when published into an exchange or enqueued to a queue.

Installing

Download the .ez files from the chosen release and copy them into the RabbitMQ plugins directory.

Check the Release notes for minimum supported versions.

Enable the plugin:

    [sudo] rabbitmq-plugins enable rabbitmq_message_deduplication

Building from Source

Please see RabbitMQ Plugin Development guide.

To build the plugin:

    git clone https://github.com/noxdafox/rabbitmq-message-deduplication.git
    cd rabbitmq-message-deduplication
    make dist

Then copy all the *.ez files inside the plugins folder to the RabbitMQ plugins directory and enable the plugin:

    [sudo] rabbitmq-plugins enable rabbitmq_message_deduplication

Version requirements

The latest version of the plugin requires RabbitMQ 3.13.0.

Earlier RabbitMQ versions are supported by 0.6.2.

Exchange level deduplication

The exchange type x-message-deduplication allows to filter message duplicates before any routing rule is applied.

Each message containing the x-deduplication-header header will not be routed if its value has been submitted previously. The amount of time a given message will be guaranteed to be unique can be controlled via the x-cache-ttl exchange argument or message header.

NOTE; This exchange acts like a fanout exchange, so routing rules are not applied.

Declare an exchange

To create a message deduplication exchange, just declare it providing the type x-message-deduplication.

Required arguments:

  • x-cache-size: maximum number of entries for the deduplication cache. If the deduplication cache fills up, unspecified existing entries will be removed to give space to new ones.

Optional arguments:

  • x-cache-ttl: amount of time in milliseconds duplicate headers are kept in cache.
  • x-cache-persistence: whether the duplicates cache will persist on disk or in memory. Default persistence type is memory.

Message headers

  • x-deduplication-header: messages will be deduplicated based on the content of this header. If the header is not provided, the message will not be checked against duplicates.
  • x-cache-ttl: this header is optional and will override the default value provided during the exchange declaration. This header controls for how many milliseconds to deduplicate the message. After the TTL expires, a new message with the same header will be routed again.

Queue level deduplication

A queue declared with the x-message-deduplication parameter enabled will filter message duplicates before they are published within.

Each message containing the x-deduplication-header header will not be enqueued if another message with the same header is already present within the queue.

NOTE: Mirrored and Quorum queues are currently not supported.

Declare a queue

When declaring a queue, it is possible to enable message deduplication via the x-message-deduplication boolean argument.

Message headers

  • x-deduplication-header: messages will be deduplicated based on the content of this header. If the header is not provided, the message will not be checked against duplicates.

Disabling the Plugin

It is possible to disable the plugin via the command:

    [sudo] rabbitmq-plugins disable rabbitmq_message_deduplication

All deduplication exchanges and queues will be rendered non functional. It is responsibility of the User to remove them.

Running the tests

    make tests

License

See the LICENSE file.

rabbitmq-message-deduplication's People

Contributors

dentarg avatar gaetanww avatar gomoripeti avatar johanrhodin avatar luos avatar m1234567898 avatar noxdafox avatar patrick-remy avatar ryanermita avatar tdugue avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rabbitmq-message-deduplication's Issues

Queue "Durable" is displayed as threaten in RabbitMQ web console

Hi,
I am using Exchange level deduplication. Everything is working as expected. But in RabbitMQ web console I see the durable value is displayed threaten and true between the refreshes. Please see the attached images. Is this normal, can you help me with this?

image

image

RabbitMQ clients hang when publishing duplicates to a deduplication queue with publish confirmation enabled

When using a durable queue and publishing a message with delivery mode 2 (persistent), publishing the duplicate message times out (default 60s for rabbitmqadmin). I haven't tested with publisher confirms, only persistent mode.

Dockerfile:

FROM rabbitmq:3.7.8-management-alpine
ADD \
  rabbitmq_message_deduplication-0.3.4-erl-20.ez \
  elixir-1.7.3-erl-20.ez \
  /opt/rabbitmq/plugins/

sha1sum for verification:

09c8170a363479d585b73831d29481bc8df7f8ee  elixir-1.7.3-erl-20.ez
1e4e4c3d6e5680dd5f05220529c536f7367cfed1  rabbitmq_message_deduplication-0.3.4-erl-20.ez

/etc/rabbitmq/enabled_plugins (enable plugin on startup):

[rabbitmq_management,rabbitmq_management_agent,
  rabbitmq_shovel,rabbitmq_shovel_management,
  rabbitmq_message_deduplication].

Case for NOT durable and DEFAULT delivery mode, this works just fine:

$ time rabbitmqadmin -u guest -p guest declare queue name=dedup-notdurable durable=false 'arguments={"x-message-deduplication": true}'
queue declared

real	0m0.072s
user	0m0.040s
sys	0m0.020s

$ time rabbitmqadmin -u guest -p guest publish routing_key=dedup-notdurable payload=helloworld 'properties={"delivery_mode":1,"headers":{"x-deduplication-header": "dedup1"}}'
Message published

real	0m0.085s
user	0m0.050s
sys	0m0.010s

$ time rabbitmqadmin -u guest -p guest publish routing_key=dedup-notdurable payload=helloworld 'properties={"delivery_mode":1,"headers":{"x-deduplication-header": "dedup1"}}'
Message published

real	0m0.063s
user	0m0.030s
sys	0m0.020s

All quick responses (in milliseconds), and the queue has the expected messages:

$ rabbitmqadmin -u guest -p guest get queue=dedup-notdurable ackmode=ack_requeue_true count=10 --depth=4
+------------------+----------+---------------+------------+---------------+------------------+--------------------------+-------------------------------------------+-------------+
|   routing_key    | exchange | message_count |  payload   | payload_bytes | payload_encoding | properties.delivery_mode | properties.headers.x-deduplication-header | redelivered |
+------------------+----------+---------------+------------+---------------+------------------+--------------------------+-------------------------------------------+-------------+
| dedup-notdurable |          | 0             | helloworld | 10            | string           | 1                        | dedup1                                    | False       |
+------------------+----------+---------------+------------+---------------+------------------+--------------------------+-------------------------------------------+-------------+

However, when using durable=true and delivery_mode:2, the message that will cause duplication upon publishing never responds. The client times out at 1 minute:

$ time rabbitmqadmin -u guest -p guest declare queue name=dedup-durable durable=true 'arguments={"x-message-deduplication": true}'
queue declared

real	0m0.066s
user	0m0.040s
sys	0m0.010s

$ time rabbitmqadmin -u guest -p guest publish routing_key=dedup-durable payload=helloworld 'properties={"delivery_mode":2,"headers":{"x-deduplication-header": "dedup1"}}'
Message published

real	0m0.069s
user	0m0.040s
sys	0m0.010s

$ time rabbitmqadmin -u guest -p guest publish routing_key=dedup-durable payload=helloworld 'properties={"delivery_mode":2,"headers":{"x-deduplication-header": "dedup1"}}'
*** Error getting HTTP response:

Traceback (most recent call last):
  File "/usr/local/bin/rabbitmqadmin", line 548, in http
    resp = conn.getresponse()
  File "/usr/lib/python2.7/httplib.py", line 1121, in getresponse
    response.begin()
  File "/usr/lib/python2.7/httplib.py", line 438, in begin
    version, status, reason = self._read_status()
  File "/usr/lib/python2.7/httplib.py", line 402, in _read_status
    raise BadStatusLine(line)
BadStatusLine: ''



real	1m0.059s
user	0m0.050s
sys	0m0.000s

$ rabbitmqadmin -u guest -p guest get queue=dedup-durable ackmode=ack_requeue_true count=10 --depth=4
+---------------+----------+---------------+------------+---------------+------------------+--------------------------+-------------------------------------------+-------------+
|  routing_key  | exchange | message_count |  payload   | payload_bytes | payload_encoding | properties.delivery_mode | properties.headers.x-deduplication-header | redelivered |
+---------------+----------+---------------+------------+---------------+------------------+--------------------------+-------------------------------------------+-------------+
| dedup-durable |          | 0             | helloworld | 10            | string           | 2                        | dedup1                                    | False       |
+---------------+----------+---------------+------------+---------------+------------------+--------------------------+-------------------------------------------+-------------+

I also tried using disk caching instead of the default memory caching, {"x-cache-persistence": "disk"}, but it had no effect with regard to the timeout, e.g.:

$ time rabbitmqadmin -u guest -p guest declare queue name=dedup-disk-durable durable=true 'arguments={"x-message-deduplication": true, "x-cache-persistence": "disk"}'

I would expect the second publish call to return an error noting the rejection of delivery with a note specifically citing it as due to message duplication. However, I'd settle for a successful "Message published" too.

Plugin crashes in cloudamqp (exchange level dedup)

Not all publishes fail to the dedup exchange fail, but some
Relevant logs follow (from rabbit logs):

=ERROR EVENT==== Thu, 04 Apr 2019 11:14:46 GMT ===
2019-04-04 11:14:46.626 [error] <0.4836.292> Error on AMQP connection <0.4836.292> (40.76.67.201:9984 -> 172.26.0.4:5672, vhost: 'reducted', user: 'reducted', state: running), channel 2:
{{noproc,
{'Elixir.GenServer',call,
[cache_exchange_reducted_company_dedup_exchange,
{insert,cache_exchange_reducted_company_dedup_exchange,
<<"09ce5b668856570bc3c5c0fb7cf291fa30ef6f37c3fa729bf8fbe7e41842057f">>,
nil},
5000]}},
[{'Elixir.GenServer',call,3,[{file,"lib/gen_server.ex"},{line,914}]},
{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Common','duplicate?',3,
[{file,"lib/common.ex"},{line,82}]},
{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Exchange','route?',2,
[{file,"lib/rabbit_message_deduplication_exchange.ex"},{line,196}]},
{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Exchange',route,2,
[{file,"lib/rabbit_message_deduplication_exchange.ex"},{line,74}]},
{rabbit_exchange,route1,3,[{file,"src/rabbit_exchange.erl"},{line,397}]},
{rabbit_exchange,route,2,[{file,"src/rabbit_exchange.erl"},{line,387}]},
{rabbit_channel,handle_method,3,
[{file,"src/rabbit_channel.erl"},{line,1138}]},
{rabbit_channel,handle_cast,2,
[{file,"src/rabbit_channel.erl"},{line,551}]}]}

=WARNING EVENT==== Thu, 04 Apr 2019 11:14:46 GMT ===
2019-04-04 11:14:46.627 [warning] <0.4836.292> Non-AMQP exit reason '{{noproc,{'Elixir.GenServer',call,[cache_exchange_reducted_company_dedup_exchange,{insert,cache_exchange_reducted_company_dedup_exchange,<<"09ce5b668856570bc3c5c0fb7cf291fa30ef6f37c3fa729bf8fbe7e41842057f">>,nil},5000]}},[{'Elixir.GenServer',call,3,[{file,"lib/gen_server.ex"},{line,914}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Common','duplicate?',3,[{file,"lib/common.ex"},{line,82}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Exchange','route?',2,[{file,"lib/rabbit_message_deduplication_exchange.ex"},{line,196}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Exchange',route,2,[{file,"lib/rabbit_message_deduplication_exchange.ex"},{line,74}]},{rabbit_exchange,route1,3,[{file,"src/rabbit_exchange.erl"},{line,397}]},{rabbit_exchange,route,2,[{file,"src/rabbit_exchange.erl"},{line,387}]},{rabbit_channel,handle_method,3,[{file,"src/rabbit_channel.erl"},{line,1138}]},{rabbit_channel,handle_cast,2,[{file,"src/rabbit_channel.erl"},{line,551}]}]}'

=ERROR EVENT==== Thu, 04 Apr 2019 11:14:46 GMT ===
2019-04-04 11:14:46.628 [error] <0.4846.292> Supervisor {<0.4846.292>,rabbit_channel_sup} had child channel started with rabbit_channel:start_link(2, <0.4836.292>, <0.4847.292>, <0.4836.292>, <<"40.76.67.201:9984 -> 172.26.0.4:5672">>, rabbit_framing_amqp_0_9_1, {user,<<"reducted">>,[administrator],[{rabbit_auth_backend_internal,none}]}, <<"reducted">>, [{<<"publisher_confirms">>,bool,true},{<<"exchange_exchange_bindings">>,bool,true},{<<"basic.nack">>,...},...], <0.4837.292>, <0.4739.292>) at <0.4848.292> exit with reason no such process or port in call to 'Elixir.GenServer':call(cache_exchange_reducted_company_dedup_exchange, {insert,cache_exchange_reducted_company_dedup_exchange,<<"09ce5b668856570bc3c5c0fb7cf291fa30ef6...">>,...}, 5000) in 'Elixir.GenServer':call/3 line
914 in context child_terminated

Reasonably large x-cache-size

Hi,

thank you for developing this plugin! Can you provide any guidelines on a maximum bound for x-cache-size? How big is too big, and what are the implications (RAM usage)?

Say I wanted to de-dup an exchange which takes in, say, 2000 messages/sec, and duped messages could be spread by as much as an hour. That would require a cache size of, say, 7,200,000. Is that insanely large?

Crash when access to exchanges tab in managment web interface

Hello! When I try to open exchanges tab I get 500 error, in log/crash.log

2020-01-22 07:29:08 =CRASH REPORT====
  crasher:
    initial call: cowboy_stream_h:request_process/3
    pid: <0.3686.0>
    registered_name: []
    exception error: {badarith,[{erlang,'*',[undefined,8],[]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Cache',info,1,[{file,"lib/cache.ex"},{line,131}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Exchange',info,2,[{file,"lib/rabbit_message_deduplication_exchange.ex"},{line,179}]},{rabbit_exchange,info,1,[{file,"src/rabbit_exchange.erl"},{line,365}]},{lists,map,2,[{file,"lists.erl"},{line,1239}]},{lists,map,2,[{file,"lists.erl"},{line,1239}]},{rabbit_mgmt_util,'-all_or_one_vhost/2-lc$^0/1-0-',2,[{file,"src/rabbit_mgmt_util.erl"},{line,977}]},{rabbit_mgmt_util,all_or_one_vhost,2,[{file,"src/rabbit_mgmt_util.erl"},{line,977}]}]}
    ancestors: [<0.3653.0>,<0.1285.0>,<0.1284.0>,rabbit_web_dispatch_sup,<0.1251.0>]
    message_queue_len: 0
    messages: []
    links: [<0.3653.0>]
    dictionary: [{{xtype_to_module,direct},rabbit_exchange_type_direct},{{xtype_to_module,'x-message-deduplication'},'Elixir.RabbitMQ.MessageDeduplicationPlugin.Exchange'},{{xtype_to_module,fanout},rabbit_exchange_type_fanout}]
    trap_exit: false
    status: running
    heap_size: 6772
    stack_size: 27
    reductions: 8285
  neighbours:
2020-01-22 07:29:08 =ERROR REPORT====
Ranch listener rabbit_web_dispatch_sup_15673, connection process <0.3653.0>, stream 16 had its request process <0.3686.0> exit with reason badarith and stacktrace [{erlang,'*',[undefined,8],[]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Cache',info,1,[{file,"lib/cache.ex"},{line,131}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Exchange',info,2,[{file,"lib/rabbit_message_deduplication_exchange.ex"},{line,179}]},{rabbit_exchange,info,1,[{file,"src/rabbit_exchange.erl"},{line,365}]},{lists,map,2,[{file,"lists.erl"},{line,1239}]},{lists,map,2,[{file,"lists.erl"},{line,1239}]},{rabbit_mgmt_util,'-all_or_one_vhost/2-lc$^0/1-0-',2,[{file,"src/rabbit_mgmt_util.erl"},{line,977}]},{rabbit_mgmt_util,all_or_one_vhost,2,[{file,"src/rabbit_mgmt_util.erl"},{line,977}]}]

Runtime

OS PID: 380
OS: Linux
Uptime (seconds): 728
RabbitMQ version: 3.8.2
Node name: rabbit@staging-rabbitmq-1
Erlang configuration: Erlang/OTP 22 [erts-10.6.2] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:64]
Erlang processes: 1445 used, 1048576 limit
Scheduler run queue: 1
Cluster heartbeat timeout (net_ticktime): 60

Plugins

# rabbitmq-plugins list
Listing plugins with pattern ".*" ...
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@staging-rabbitmq-1
 |/
[  ] rabbitmq_amqp1_0                  3.8.2
[  ] rabbitmq_auth_backend_cache       3.8.2
[  ] rabbitmq_auth_backend_http        3.8.2
[  ] rabbitmq_auth_backend_ldap        3.8.2
[  ] rabbitmq_auth_backend_oauth2      3.8.2
[  ] rabbitmq_auth_mechanism_ssl       3.8.2
[  ] rabbitmq_consistent_hash_exchange 3.8.2
[  ] rabbitmq_event_exchange           3.8.2
[  ] rabbitmq_federation               3.8.2
[  ] rabbitmq_federation_management    3.8.2
[  ] rabbitmq_jms_topic_exchange       3.8.2
[E*] rabbitmq_management               3.8.2
[e*] rabbitmq_management_agent         3.8.2
[E*] rabbitmq_message_deduplication    0.4.3
[  ] rabbitmq_mqtt                     3.8.2
[  ] rabbitmq_peer_discovery_aws       3.8.2
[  ] rabbitmq_peer_discovery_common    3.8.2
[  ] rabbitmq_peer_discovery_consul    3.8.2
[  ] rabbitmq_peer_discovery_etcd      3.8.2
[  ] rabbitmq_peer_discovery_k8s       3.8.2
[  ] rabbitmq_prometheus               3.8.2
[  ] rabbitmq_random_exchange          3.8.2
[  ] rabbitmq_recent_history_exchange  3.8.2
[  ] rabbitmq_sharding                 3.8.2
[  ] rabbitmq_shovel                   3.8.2
[  ] rabbitmq_shovel_management        3.8.2
[E*] rabbitmq_stomp                    3.8.2
[  ] rabbitmq_top                      3.8.2
[  ] rabbitmq_tracing                  3.8.2
[  ] rabbitmq_trust_store              3.8.2
[e*] rabbitmq_web_dispatch             3.8.2
[  ] rabbitmq_web_mqtt                 3.8.2
[  ] rabbitmq_web_mqtt_examples        3.8.2
[  ] rabbitmq_web_stomp                3.8.2
[  ] rabbitmq_web_stomp_examples       3.8.2

This error start appear after restarting one of is RabbitMQ in cluster. And on other machine in cluster it work without problem.

docker hub rabbitmq:3.7.8 with 0.3.4-erl-20 doesn't dedup queue

Using the docker hub version of rabbitmq doesn't perform queue deduplication.

  • I did not try exchanged-based deduplication.
  • I did not try to reproduce on a non-management image.
  • I tried various other management tags all with the same results: 3.7.8-management-alpine, 3.7.8-management, 3.7.3-management-alpine
  • rabbitmq:3.7.8-management-alpine shows versions as Starting RabbitMQ 3.7.8 on Erlang 20.3.4.
  • I'm using prebuilt plugins from the releases page.
  • Other plugins that ship with rabbitmq work, e.g. management and shovel.

Steps to reproduce...

Dockerfile (make sure you have the ez files next to this file):

FROM rabbitmq:3.7.8-management-alpine
ADD \
  rabbitmq_message_deduplication-0.3.4-erl-20.ez \
  elixir-1.7.2-erl-20.ez \
  /opt/rabbitmq/plugins/

Build with: docker build --tag rmq:3.7.8-with-dedup .

Run the built image: docker run -d -p '15672:15672' rmq:3.7.8-with-dedup (this exposes the management port should you want it).

Run the following commands on the container, this is easier by exec'ing right into the running container via docker exec -it <container id> /bin/bash. The following is verbose to show output of various steps.

rabbitmq-plugins list:

 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@f581a037d713
 |/
[  ] rabbitmq_amqp1_0                  3.7.8
[  ] rabbitmq_auth_backend_cache       3.7.8
[  ] rabbitmq_auth_backend_http        3.7.8
[  ] rabbitmq_auth_backend_ldap        3.7.8
[  ] rabbitmq_auth_mechanism_ssl       3.7.8
[  ] rabbitmq_consistent_hash_exchange 3.7.8
[  ] rabbitmq_event_exchange           3.7.8
[  ] rabbitmq_federation               3.7.8
[  ] rabbitmq_federation_management    3.7.8
[  ] rabbitmq_jms_topic_exchange       3.7.8
[E*] rabbitmq_management               3.7.8
[e*] rabbitmq_management_agent         3.7.8
[  ] rabbitmq_message_deduplication    0.3.4
[  ] rabbitmq_mqtt                     3.7.8
[  ] rabbitmq_peer_discovery_aws       3.7.8
[  ] rabbitmq_peer_discovery_common    3.7.8
[  ] rabbitmq_peer_discovery_consul    3.7.8
[  ] rabbitmq_peer_discovery_etcd      3.7.8
[  ] rabbitmq_peer_discovery_k8s       3.7.8
[  ] rabbitmq_random_exchange          3.7.8
[  ] rabbitmq_recent_history_exchange  3.7.8
[  ] rabbitmq_sharding                 3.7.8
[  ] rabbitmq_shovel                   3.7.8
[  ] rabbitmq_shovel_management        3.7.8
[  ] rabbitmq_stomp                    3.7.8
[  ] rabbitmq_top                      3.7.8
[  ] rabbitmq_tracing                  3.7.8
[  ] rabbitmq_trust_store              3.7.8
[e*] rabbitmq_web_dispatch             3.7.8
[  ] rabbitmq_web_mqtt                 3.7.8
[  ] rabbitmq_web_mqtt_examples        3.7.8
[  ] rabbitmq_web_stomp                3.7.8
[  ] rabbitmq_web_stomp_examples       3.7.8

rabbitmq-plugins enable rabbitmq_message_deduplication:

The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_message_deduplication
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@f581a037d713...
WARNING: module 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue' not found, so not scanned for boot steps.
The following plugins have been enabled:
  rabbitmq_message_deduplication

started 1 plugins.

Notice the following logs from the container docker container logs <container id>:

[info] <0.33.0> Application elixir started on node rabbit@2e85580642a5
[info] <0.33.0> Application rabbitmq_message_deduplication started on node rabbit@2e85580642a5
[error] <0.38.0> Loading of /var/lib/rabbitmq/mnesia/rabbit@2e85580642a5-plugins-expand/rabbitmq_message_deduplication-0.3.4/ebin/Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue.beam failed: badfile
[error] emulator beam/beam_load.c(1863): Error loading module 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':
  This BEAM file was compiled for a later version of the run-time system than 20.
  To fix this, please recompile this module with an 20 compiler.
  (Use of opcode 163; this emulator supports only up to 159.)

rabbitmq-plugins list shows the plugin is enabled:

 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@f581a037d713
 |/
[  ] rabbitmq_amqp1_0                  3.7.8
[  ] rabbitmq_auth_backend_cache       3.7.8
[  ] rabbitmq_auth_backend_http        3.7.8
[  ] rabbitmq_auth_backend_ldap        3.7.8
[  ] rabbitmq_auth_mechanism_ssl       3.7.8
[  ] rabbitmq_consistent_hash_exchange 3.7.8
[  ] rabbitmq_event_exchange           3.7.8
[  ] rabbitmq_federation               3.7.8
[  ] rabbitmq_federation_management    3.7.8
[  ] rabbitmq_jms_topic_exchange       3.7.8
[E*] rabbitmq_management               3.7.8
[e*] rabbitmq_management_agent         3.7.8
[E*] rabbitmq_message_deduplication    0.3.4
[  ] rabbitmq_mqtt                     3.7.8
[  ] rabbitmq_peer_discovery_aws       3.7.8
[  ] rabbitmq_peer_discovery_common    3.7.8
[  ] rabbitmq_peer_discovery_consul    3.7.8
[  ] rabbitmq_peer_discovery_etcd      3.7.8
[  ] rabbitmq_peer_discovery_k8s       3.7.8
[  ] rabbitmq_random_exchange          3.7.8
[  ] rabbitmq_recent_history_exchange  3.7.8
[  ] rabbitmq_sharding                 3.7.8
[  ] rabbitmq_shovel                   3.7.8
[  ] rabbitmq_shovel_management        3.7.8
[  ] rabbitmq_stomp                    3.7.8
[  ] rabbitmq_top                      3.7.8
[  ] rabbitmq_tracing                  3.7.8
[  ] rabbitmq_trust_store              3.7.8
[e*] rabbitmq_web_dispatch             3.7.8
[  ] rabbitmq_web_mqtt                 3.7.8
[  ] rabbitmq_web_mqtt_examples        3.7.8
[  ] rabbitmq_web_stomp                3.7.8
[  ] rabbitmq_web_stomp_examples       3.7.8

Now let's test the dedup since rabbitmq is reporting the plugin enabled (despite the beam error):

rabbitmqadmin -u guest -p guest declare queue name=deduptest0 'arguments={"x-message-deduplication": true}'
rabbitmqadmin -u guest -p guest -f long -d 3 list queues

list queues gives (truncated):

--------------------------------------------------------------------------------

                                    vhost: /
                                     name: deduptest0
        arguments.x-message-deduplication: True
                              auto_delete: False
                                        ...
                                  durable: True

Finally, test the dedup:

rabbitmqadmin -u guest -p guest publish routing_key=deduptest0 payload=helloworld0  'properties={"headers":{"x-deduplication-header": "dedup-this"}}'
rabbitmqadmin -u guest -p guest publish routing_key=deduptest0 payload=helloworld1  'properties={"headers":{"x-deduplication-header": "dedup-this"}}'
rabbitmqadmin -u guest -p guest get queue=deduptest0 ackmode=ack_requeue_true count=10 --depth=4

get shows 2 messages:

+-------------+----------+---------------+-------------+---------------+------------------+-------------------------------------------+-------------+
| routing_key | exchange | message_count |   payload   | payload_bytes | payload_encoding | properties.headers.x-deduplication-header | redelivered |
+-------------+----------+---------------+-------------+---------------+------------------+-------------------------------------------+-------------+
| deduptest0  |          | 1             | helloworld0 | 11            | string           | dedup-this                                | False       |
| deduptest0  |          | 0             | helloworld1 | 11            | string           | dedup-this                                | False       |
+-------------+----------+---------------+-------------+---------------+------------------+-------------------------------------------+-------------+

I can also reproduce this using the management web UI.

I assume the plugin isn't loading properly (opcode mismatch error), but rabbitmq is reporting it as loaded anyway. I'm using the plugin built for erlang 20 which rabbitmq is using erlang 20. Please advise.

RabbitMQ crash if queue in lazy mode

RabbitMQ crashes on message publishig, if queue created with x-queue-mode = lazy and x-message-deduplication = true.

Crash log:

2018-09-12 14:44:49 =ERROR REPORT====
** Generic server <0.19585.0> terminating
** Last message in was {basic_get,<0.19582.0>,false,<0.19581.0>}
** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"test">>},true,false,none,[{<<"x-queue-mode">>,longstr,<<"lazy">>},{<<"x-message-deduplication">>,bool,true}],<0.19585.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"guest">>}},none,false,rabbit_priority_queue,{passthrough,'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"test">>},true,false,none,[{<<"x-queue-mode">>,longstr,<<"lazy">>},{<<"x-message-deduplication">>,bool,true}],<0.19585.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"guest">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,0,20,20,20},{0,{[],[]}},{0,{[],[]}},20,{0,nil},{0,nil},{0,nil},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@d954967ecf3c/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/2SVZ97PJQCBIFINCC1KIO7L5D",{#{},[{segment,0,"/var/lib/rabbitmq/mnesia/rabbit@d954967ecf3c/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/2SVZ97PJQCBIFINCC1KIO7L5D/0.idx",{array,16384,0,undefined,{{{{{{{false,<<47,164,110,34,90,244,167,77,85,224,112,230,15,74,191,126,0,0,0,0,0,0,0,0,0,0,0,5>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,48,33,33,106,109,0,0,0,16,47,164,110,34,90,244,167,77,85,224,112,230,15,74,191,126,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<103,170,192,248,104,190,23,83,50,20,216,203,61,94,103,181,0,0,0,0,0,0,0,0,0,0,0,5>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,49,33,33,106,109,0,0,0,16,103,170,192,248,104,190,23,83,50,20,216,203,61,94,103,181,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<161,155,252,84,251,49,51,204,244,123,140,178,201,37,235,7,0,0,0,0,0,0,0,0,0,0,0,5>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,50,33,33,106,109,0,0,0,16,161,155,252,84,251,49,51,204,244,123,140,178,201,37,235,7,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<184,31,162,188,183,85,29,194,138,11,122,119,67,46,145,112,0,0,0,0,0,0,0,0,0,0,0,5>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,51,33,33,106,109,0,0,0,16,184,31,162,188,183,85,29,194,138,11,122,119,67,46,145,112,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<195,177,20,10,254,239,115,191,55,202,152,184,116,228,9,200,0,0,0,0,0,0,0,0,0,0,0,5>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,52,33,33,106,109,0,0,0,16,195,177,20,10,254,239,115,191,55,202,152,184,116,228,9,200,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<55,167,76,157,254,130,167,154,189,172,54,234,201,72,63,34,0,0,0,0,0,0,0,0,0,0,0,5>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,53,33,33,106,109,0,0,0,16,55,167,76,157,254,130,167,154,189,172,54,234,201,72,63,34,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<214,220,51,223,149,242,162,175,225,22,171,103,40,94,148,69,0,0,0,0,0,0,0,0,0,0,0,5>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,54,33,33,106,109,0,0,0,16,214,220,51,223,149,242,162,175,225,22,171,103,40,94,148,69,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<75,161,5,216,63,69,12,16,246,13,51,50,222,83,167,119,0,0,0,0,0,0,0,0,0,0,0,5>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,55,33,33,106,109,0,0,0,16,75,161,5,216,63,69,12,16,246,13,51,50,222,83,167,119,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<60,120,135,60,245,48,184,30,221,110,44,91,3,61,139,44,0,0,0,0,0,0,0,0,0,0,0,5>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,56,33,33,106,109,0,0,0,16,60,120,135,60,245,48,184,30,221,110,44,91,3,61,139,44,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<212,29,201,49,252,67,93,116,34,16,250,3,33,45,113,47,0,0,0,0,0,0,0,0,0,0,0,5>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,57,33,33,106,109,0,0,0,16,212,29,201,49,252,67,93,116,34,16,250,3,33,45,113,47,100,0,5,102,97,108,115,101>>},no_del,no_ack}},{{{false,<<38,78,39,217,248,29,128,174,251,32,11,130,218,13,122,173,0,0,0,0,0,0,0,0,0,0,0,6>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,48,33,33,106,109,0,0,0,16,38,78,39,217,248,29,128,174,251,32,11,130,218,13,122,173,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<61,42,93,224,62,23,214,204,31,58,167,227,197,55,221,78,0,0,0,0,0,0,0,0,0,0,0,6>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,49,33,33,106,109,0,0,0,16,61,42,93,224,62,23,214,204,31,58,167,227,197,55,221,78,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<151,231,245,7,182,202,132,40,108,199,254,133,169,240,35,203,0,0,0,0,0,0,0,0,0,0,0,6>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,50,33,33,106,109,0,0,0,16,151,231,245,7,182,202,132,40,108,199,254,133,169,240,35,203,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<4,79,50,93,222,66,72,240,27,117,149,190,178,133,182,117,0,0,0,0,0,0,0,0,0,0,0,6>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,51,33,33,106,109,0,0,0,16,4,79,50,93,222,66,72,240,27,117,149,190,178,133,182,117,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<5,250,12,226,192,205,209,172,105,61,242,103,219,184,68,18,0,0,0,0,0,0,0,0,0,0,0,6>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,52,33,33,106,109,0,0,0,16,5,250,12,226,192,205,209,172,105,61,242,103,219,184,68,18,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<120,57,123,248,209,201,88,51,99,76,238,70,184,244,170,84,0,0,0,0,0,0,0,0,0,0,0,6>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,53,33,33,106,109,0,0,0,16,120,57,123,248,209,201,88,51,99,76,238,70,184,244,170,84,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<134,244,196,66,52,113,114,55,239,201,54,37,87,61,156,113,0,0,0,0,0,0,0,0,0,0,0,6>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,54,33,33,106,109,0,0,0,16,134,244,196,66,52,113,114,55,239,201,54,37,87,61,156,113,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<198,103,94,209,29,223,26,195,165,43,176,151,242,22,44,230,0,0,0,0,0,0,0,0,0,0,0,6>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,55,33,33,106,109,0,0,0,16,198,103,94,209,29,223,26,195,165,43,176,151,242,22,44,230,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<241,164,86,16,73,80,252,68,30,109,96,53,236,123,76,211,0,0,0,0,0,0,0,0,0,0,0,6>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,56,33,33,106,109,0,0,0,16,241,164,86,16,73,80,252,68,30,109,96,53,236,123,76,211,100,0,5,102,97,108,115,101>>},no_del,no_ack},{{false,<<246,204,202,197,161,241,86,180,83,231,122,82,191,156,54,129,0,0,0,0,0,0,0,0,0,0,0,6>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,57,33,33,106,109,0,0,0,16,246,204,202,197,161,241,86,180,83,231,122,82,191,156,54,129,100,0,5,102,97,108,115,101>>},no_del,no_ack}},10,10,10,10,10,10,10,10,10},100,100,100,100,100,100,100,100,100,100},1000,1000,1000,1000,1000,1000,1000,1000,1000,1000},10000,10000,10000,10000,10000,10000,10000,10000,10000,10000}},{array,16384,0,[],{{{{{[[<<128,0,47,164,110,34,90,244,167,77,85,224,112,230,15,74,191,126,0,0,0,0,0,0,0,0,0,0,0,5,0,0,0,180>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,48,33,33,106,109,0,0,0,16,47,164,110,34,90,244,167,77,85,224,112,230,15,74,191,126,100,0,5,102,97,108,115,101>>]],[[<<128,1,103,170,192,248,104,190,23,83,50,20,216,203,61,94,103,181,0,0,0,0,0,0,0,0,0,0,0,5,0,0,0,180>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,49,33,33,106,109,0,0,0,16,103,170,192,248,104,190,23,83,50,20,216,203,61,94,103,181,100,0,5,102,97,108,115,101>>]],[[<<128,2,161,155,252,84,251,49,51,204,244,123,140,178,201,37,235,7,0,0,0,0,0,0,0,0,0,0,0,5,0,0,0,180>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,50,33,33,106,109,0,0,0,16,161,155,252,84,251,49,51,204,244,123,140,178,201,37,235,7,100,0,5,102,97,108,115,101>>]],[[<<128,3,184,31,162,188,183,85,29,194,138,11,122,119,67,46,145,112,0,0,0,0,0,0,0,0,0,0,0,5,0,0,0,180>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,51,33,33,106,109,0,0,0,16,184,31,162,188,183,85,29,194,138,11,122,119,67,46,145,112,100,0,5,102,97,108,115,101>>]],[[<<128,4,195,177,20,10,254,239,115,191,55,202,152,184,116,228,9,200,0,0,0,0,0,0,0,0,0,0,0,5,0,0,0,180>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,52,33,33,106,109,0,0,0,16,195,177,20,10,254,239,115,191,55,202,152,184,116,228,9,200,100,0,5,102,97,108,115,101>>]],[[<<128,5,55,167,76,157,254,130,167,154,189,172,54,234,201,72,63,34,0,0,0,0,0,0,0,0,0,0,0,5,0,0,0,180>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,53,33,33,106,109,0,0,0,16,55,167,76,157,254,130,167,154,189,172,54,234,201,72,63,34,100,0,5,102,97,108,115,101>>]],[[<<128,6,214,220,51,223,149,242,162,175,225,22,171,103,40,94,148,69,0,0,0,0,0,0,0,0,0,0,0,5,0,0,0,180>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,54,33,33,106,109,0,0,0,16,214,220,51,223,149,242,162,175,225,22,171,103,40,94,148,69,100,0,5,102,97,108,115,101>>]],[[<<128,7,75,161,5,216,63,69,12,16,246,13,51,50,222,83,167,119,0,0,0,0,0,0,0,0,0,0,0,5,0,0,0,180>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,55,33,33,106,109,0,0,0,16,75,161,5,216,63,69,12,16,246,13,51,50,222,83,167,119,100,0,5,102,97,108,115,101>>]],[[<<128,8,60,120,135,60,245,48,184,30,221,110,44,91,3,61,139,44,0,0,0,0,0,0,0,0,0,0,0,5,0,0,0,180>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,56,33,33,106,109,0,0,0,16,60,120,135,60,245,48,184,30,221,110,44,91,3,61,139,44,100,0,5,102,97,108,115,101>>]],[[<<128,9,212,29,201,49,252,67,93,116,34,16,250,3,33,45,113,47,0,0,0,0,0,0,0,0,0,0,0,5,0,0,0,180>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,5,33,33,57,33,33,106,109,0,0,0,16,212,29,201,49,252,67,93,116,34,16,250,3,33,45,113,47,100,0,5,102,97,108,115,101>>]]},{[[<<128,10,38,78,39,217,248,29,128,174,251,32,11,130,218,13,122,173,0,0,0,0,0,0,0,0,0,0,0,6,0,0,0,181>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,48,33,33,106,109,0,0,0,16,38,78,39,217,248,29,128,174,251,32,11,130,218,13,122,173,100,0,5,102,97,108,115,101>>]],[[<<128,11,61,42,93,224,62,23,214,204,31,58,167,227,197,55,221,78,0,0,0,0,0,0,0,0,0,0,0,6,0,0,0,181>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,49,33,33,106,109,0,0,0,16,61,42,93,224,62,23,214,204,31,58,167,227,197,55,221,78,100,0,5,102,97,108,115,101>>]],[[<<128,12,151,231,245,7,182,202,132,40,108,199,254,133,169,240,35,203,0,0,0,0,0,0,0,0,0,0,0,6,0,0,0,181>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,50,33,33,106,109,0,0,0,16,151,231,245,7,182,202,132,40,108,199,254,133,169,240,35,203,100,0,5,102,97,108,115,101>>]],[[<<128,13,4,79,50,93,222,66,72,240,27,117,149,190,178,133,182,117,0,0,0,0,0,0,0,0,0,0,0,6,0,0,0,181>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,51,33,33,106,109,0,0,0,16,4,79,50,93,222,66,72,240,27,117,149,190,178,133,182,117,100,0,5,102,97,108,115,101>>]],[[<<128,14,5,250,12,226,192,205,209,172,105,61,242,103,219,184,68,18,0,0,0,0,0,0,0,0,0,0,0,6,0,0,0,181>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,52,33,33,106,109,0,0,0,16,5,250,12,226,192,205,209,172,105,61,242,103,219,184,68,18,100,0,5,102,97,108,115,101>>]],[[<<128,15,120,57,123,248,209,201,88,51,99,76,238,70,184,244,170,84,0,0,0,0,0,0,0,0,0,0,0,6,0,0,0,181>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,53,33,33,106,109,0,0,0,16,120,57,123,248,209,201,88,51,99,76,238,70,184,244,170,84,100,0,5,102,97,108,115,101>>]],[[<<128,16,134,244,196,66,52,113,114,55,239,201,54,37,87,61,156,113,0,0,0,0,0,0,0,0,0,0,0,6,0,0,0,181>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,54,33,33,106,109,0,0,0,16,134,244,196,66,52,113,114,55,239,201,54,37,87,61,156,113,100,0,5,102,97,108,115,101>>]],[[<<128,17,198,103,94,209,29,223,26,195,165,43,176,151,242,22,44,230,0,0,0,0,0,0,0,0,0,0,0,6,0,0,0,181>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,55,33,33,106,109,0,0,0,16,198,103,94,209,29,223,26,195,165,43,176,151,242,22,44,230,100,0,5,102,97,108,115,101>>]],[[<<128,18,241,164,86,16,73,80,252,68,30,109,96,53,236,123,76,211,0,0,0,0,0,0,0,0,0,0,0,6,0,0,0,181>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,56,33,33,106,109,0,0,0,16,241,164,86,16,73,80,252,68,30,109,96,53,236,123,76,211,100,0,5,102,97,108,115,101>>]],[[<<128,19,246,204,202,197,161,241,86,180,83,231,122,82,191,156,54,129,0,0,0,0,0,0,0,0,0,0,0,6,0,0,0,181>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,13,116,101,115,116,95,101,120,99,104,97,110,103,101,108,0,0,0,1,109,0,0,0,1,50,106,104,6,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,2,0,0,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,6,33,33,49,57,33,33,106,109,0,0,0,16,246,204,202,197,161,241,86,180,83,231,122,82,191,156,54,129,100,0,5,102,97,108,115,101>>]]},10,10,10,10,10,10,10,10,10},100,100,100,100,100,100,100,100,100,100},1000,1000,1000,1000,1000,1000,1000,1000,1000,1000},10000,10000,10000,10000,10000,10000,10000,10000,10000,10000}},20}]},#Ref<0.2846713033.3594780673.182445>,20,32768,#Fun<rabbit_variable_queue.10.126666440>,#Fun<rabbit_variable_queue.11.126666440>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"test">>}},{{client_msstate,<0.354.0>,<<79,68,41,81,73,134,23,39,156,246,10,38,145,202,8,55>>,#{},{state,#Ref<0.2846713033.3594911745.86549>,"/var/lib/rabbitmq/mnesia/rabbit@d954967ecf3c/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@d954967ecf3c/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent",<0.357.0>,#Ref<0.2846713033.3594911745.86550>,#Ref<0.2846713033.3594911745.86538>,#Ref<0.2846713033.3594911745.86551>,#Ref<0.2846713033.3594911745.86552>,{4000,800}},{client_msstate,<0.351.0>,<<252,19,29,122,41,99,0,123,36,95,2,106,181,149,10,93>>,#{},{state,#Ref<0.2846713033.3594911745.86501>,"/var/lib/rabbitmq/mnesia/rabbit@d954967ecf3c/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@d954967ecf3c/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient",<0.352.0>,#Ref<0.2846713033.3594911745.86502>,#Ref<0.2846713033.3594911745.86499>,#Ref<0.2846713033.3594911745.86503>,#Ref<0.2846713033.3594911745.86504>,{4000,800}}},true,0,4096,20,110,0,0,0,110,infinity,0,0,0,0,0,20,{rates,0.0,0.0,0.0,0.0,-576460539574932709},{0,nil},{0,nil},{0,nil},{0,nil},0,0,1,20,4096,lazy,20,<<"/">>,false}}},{state,{queue,[],[],0},{inactive,-576460538456426,7285812,1.0}},undefined,undefined,{erlang,#Ref<0.2846713033.3594780673.181753>},undefined,{state,fine,5000,#Ref<0.2846713033.3594780673.181754>},{0,nil},undefined,undefined,undefined,{state,{dict,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},delegate},undefined,undefined,undefined,undefined,'drop-head',1,0,running}
** Reason for termination ==
** {{case_clause,none},[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Common',message_header,2,[{file,"lib/common.ex"},{line,64}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,235}]},{rabbit_priority_queue,fetch,2,[{file,"src/rabbit_priority_queue.erl"},{line,302}]},{rabbit_amqqueue_process,fetch,2,[{file,"src/rabbit_amqqueue_process.erl"},{line,767}]},{rabbit_amqqueue_process,handle_call,3,[{file,"src/rabbit_amqqueue_process.erl"},{line,1177}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1029}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]}
2018-09-12 14:44:49 =CRASH REPORT====
crasher:
initial call: rabbit_prequeue:init/1
pid: <0.19585.0>
registered_name: []
exception exit: {{{case_clause,none},[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Common',message_header,2,[{file,"lib/common.ex"},{line,64}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,235}]},{rabbit_priority_queue,fetch,2,[{file,"src/rabbit_priority_queue.erl"},{line,302}]},{rabbit_amqqueue_process,fetch,2,[{file,"src/rabbit_amqqueue_process.erl"},{line,767}]},{rabbit_amqqueue_process,handle_call,3,[{file,"src/rabbit_amqqueue_process.erl"},{line,1177}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1029}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]},[{gen_server2,terminate,3,[{file,"src/gen_server2.erl"},{line,1166}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]}
ancestors: [<0.19584.0>,<0.360.0>,<0.346.0>,<0.345.0>,rabbit_vhost_sup_sup,rabbit_sup,<0.258.0>]
message_queue_len: 0
messages: []
links: [#Port<0.32484>,<0.19584.0>]
dictionary: [{credit_flow_default_credit,{400,200}},{credit_blocked,[]},{{#Ref<0.2846713033.3594780673.182445>,fhc_handle},{handle,{file_descriptor,prim_file,{#Port<0.32484>,76}},#Ref<0.2846713033.3594780673.182445>,4410,false,8,infinity,[[<<128,0,0,0,0,0,0,0>>]],<<>>,0,0,0,0,0,true,"/var/lib/rabbitmq/mnesia/rabbit@d954967ecf3c/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/2SVZ97PJQCBIFINCC1KIO7L5D/journal.jif",[write,binary,raw,read],[{write_buffer,infinity}],true,true,-576460537534651361}},{process_name,{rabbit_amqqueue_process,{resource,<<"/">>,queue,<<"test">>}}},{explicit_gc_run_operation_threshold,1000},{fhc_age_tree,{1,{{-576460537534651361,#Ref<0.2846713033.3594780673.182445>},true,nil,nil}}},{{xtype_to_module,direct},rabbit_exchange_type_direct},{rand_seed,{#{jump => #Fun<rand.16.15449617>,max => 288230376151711743,next => #Fun<rand.15.15449617>,type => exsplus},[139363025843193945|76666950932622682]}},{guid,{{4229111162,694354043,610206314,3046443613},1}},{{"/var/lib/rabbitmq/mnesia/rabbit@d954967ecf3c/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/2SVZ97PJQCBIFINCC1KIO7L5D/journal.jif",fhc_file},{file,1,true}}]
trap_exit: true
status: running
heap_size: 6772
stack_size: 27
reductions: 29051
neighbours:
2018-09-12 14:44:49 =SUPERVISOR REPORT====
Supervisor: {<0.19584.0>,rabbit_amqqueue_sup}
Context: child_terminated
Reason: {{case_clause,none},[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Common',message_header,2,[{file,"lib/common.ex"},{line,64}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,235}]},{rabbit_priority_queue,fetch,2,[{file,"src/rabbit_priority_queue.erl"},{line,302}]},{rabbit_amqqueue_process,fetch,2,[{file,"src/rabbit_amqqueue_process.erl"},{line,767}]},{rabbit_amqqueue_process,handle_call,3,[{file,"src/rabbit_amqqueue_process.erl"},{line,1177}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1029}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]}
Offender: [{pid,<0.19585.0>},{name,rabbit_amqqueue},{mfargs,{rabbit_prequeue,start_link,[{amqqueue,{resource,<<"/">>,queue,<<"test">>},true,false,none,[{<<"x-queue-mode">>,longstr,<<"lazy">>},{<<"x-message-deduplication">>,bool,true}],none,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"guest">>}},declare,<0.19583.0>]}},{restart_type,intrinsic},{shutdown,30000},{child_type,worker}]

2018-09-12 14:44:49 =ERROR REPORT====
** Generic server <0.19582.0> terminating
** Last message in was {'$gen_cast',{method,{'basic.get',0,<<"test">>,false},none,noflow}}
** When Server state == {ch,running,rabbit_framing_amqp_0_9_1,1,<0.19575.0>,<0.19580.0>,<0.19575.0>,<<"172.19.0.101:36163 -> 172.19.0.3:5672">>,{lstate,<0.19581.0>,false},none,1,{[],[]},{user,<<"guest">>,[administrator],[{rabbit_auth_backend_internal,none}]},<<"/">>,<<"test">>,#{},{state,{dict,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},erlang},#{},#{},{set,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},<0.19576.0>,{state,fine,5000,#Ref<0.2846713033.3594780673.181755>},false,1,{{0,nil},{0,nil}},[],[],{{0,nil},{0,nil}},[{<<"authentication_failure_close">>,bool,true},{<<"publisher_confirms">>,bool,true},{<<"consumer_cancel_notify">>,bool,true},{<<"exchange_exchange_bindings">>,bool,true},{<<"basic.nack">>,bool,true},{<<"connection.blocked">>,bool,true}],none,0,none,flow,[]}
** Reason for termination ==
** {{{{case_clause,none},[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Common',message_header,2,[{file,"lib/common.ex"},{line,64}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,235}]},{rabbit_priority_queue,fetch,2,[{file,"src/rabbit_priority_queue.erl"},{line,302}]},{rabbit_amqqueue_process,fetch,2,[{file,"src/rabbit_amqqueue_process.erl"},{line,767}]},{rabbit_amqqueue_process,handle_call,3,[{file,"src/rabbit_amqqueue_process.erl"},{line,1177}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1029}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]},{gen_server2,call,[<0.19585.0>,{basic_get,<0.19582.0>,false,<0.19581.0>},infinity]}},[{gen_server2,call,3,[{file,"src/gen_server2.erl"},{line,329}]},{rabbit_misc,with_exit_handler,2,[{file,"src/rabbit_misc.erl"},{line,506}]},{rabbit_channel,handle_method,3,[{file,"src/rabbit_channel.erl"},{line,1153}]},{rabbit_channel,handle_cast,2,[{file,"src/rabbit_channel.erl"},{line,527}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1050}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]}
2018-09-12 14:44:49 =CRASH REPORT====
crasher:
initial call: rabbit_channel:init/1
pid: <0.19582.0>
registered_name: []
exception exit: {{{{{case_clause,none},[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Common',message_header,2,[{file,"lib/common.ex"},{line,64}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,235}]},{rabbit_priority_queue,fetch,2,[{file,"src/rabbit_priority_queue.erl"},{line,302}]},{rabbit_amqqueue_process,fetch,2,[{file,"src/rabbit_amqqueue_process.erl"},{line,767}]},{rabbit_amqqueue_process,handle_call,3,[{file,"src/rabbit_amqqueue_process.erl"},{line,1177}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1029}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]},{gen_server2,call,[<0.19585.0>,{basic_get,<0.19582.0>,false,<0.19581.0>},infinity]}},[{gen_server2,call,3,[{file,"src/gen_server2.erl"},{line,329}]},{rabbit_misc,with_exit_handler,2,[{file,"src/rabbit_misc.erl"},{line,506}]},{rabbit_channel,handle_method,3,[{file,"src/rabbit_channel.erl"},{line,1153}]},{rabbit_channel,handle_cast,2,[{file,"src/rabbit_channel.erl"},{line,527}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1050}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]},[{gen_server2,terminate,3,[{file,"src/gen_server2.erl"},{line,1166}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]}
ancestors: [<0.19579.0>,<0.19577.0>,<0.19574.0>,<0.19573.0>,<0.391.0>,<0.390.0>,<0.389.0>,rabbit_sup,<0.258.0>]
message_queue_len: 0
messages: []
links: [<0.19579.0>]
dictionary: [{permission_cache,[{{resource,<<"/">>,queue,<<"test">>},read}]},{channel_operation_timeout,15000},{rand_seed,{#{jump => #Fun<rand.16.15449617>,max => 288230376151711743,next => #Fun<rand.15.15449617>,type => exsplus},[191267442019889918|169158881536127441]}},{{xtype_to_module,topic},rabbit_exchange_type_topic},{process_name,{rabbit_channel,{<<"172.19.0.101:36163 -> 172.19.0.3:5672">>,1}}}]
trap_exit: true
status: running
heap_size: 2586
stack_size: 27
reductions: 6257
neighbours:
2018-09-12 14:44:49 =SUPERVISOR REPORT====
Supervisor: {<0.19579.0>,rabbit_channel_sup}
Context: child_terminated
Reason: {{{{case_clause,none},[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Common',message_header,2,[{file,"lib/common.ex"},{line,64}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,235}]},{rabbit_priority_queue,fetch,2,[{file,"src/rabbit_priority_queue.erl"},{line,302}]},{rabbit_amqqueue_process,fetch,2,[{file,"src/rabbit_amqqueue_process.erl"},{line,767}]},{rabbit_amqqueue_process,handle_call,3,[{file,"src/rabbit_amqqueue_process.erl"},{line,1177}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1029}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]},{gen_server2,call,[<0.19585.0>,{basic_get,<0.19582.0>,false,<0.19581.0>},infinity]}},[{gen_server2,call,3,[{file,"src/gen_server2.erl"},{line,329}]},{rabbit_misc,with_exit_handler,2,[{file,"src/rabbit_misc.erl"},{line,506}]},{rabbit_channel,handle_method,3,[{file,"src/rabbit_channel.erl"},{line,1153}]},{rabbit_channel,handle_cast,2,[{file,"src/rabbit_channel.erl"},{line,527}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1050}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]}
Offender: [{pid,<0.19582.0>},{name,channel},{mfargs,{rabbit_channel,start_link,[1,<0.19575.0>,<0.19580.0>,<0.19575.0>,<<"172.19.0.101:36163 -> 172.19.0.3:5672">>,rabbit_framing_amqp_0_9_1,{user,<<"guest">>,[administrator],[{rabbit_auth_backend_internal,none}]},<<"/">>,[{<<"authentication_failure_close">>,bool,true},{<<"publisher_confirms">>,bool,true},{<<"consumer_cancel_notify">>,bool,true},{<<"exchange_exchange_bindings">>,bool,true},{<<"basic.nack">>,bool,true},{<<"connection.blocked">>,bool,true}],<0.19576.0>,<0.19581.0>]}},{restart_type,intrinsic},{shutdown,70000},{child_type,worker}]

2018-09-12 14:44:49 =SUPERVISOR REPORT====
Supervisor: {<0.19579.0>,rabbit_channel_sup}
Context: shutdown
Reason: reached_max_restart_intensity
Offender: [{pid,<0.19582.0>},{name,channel},{mfargs,{rabbit_channel,start_link,[1,<0.19575.0>,<0.19580.0>,<0.19575.0>,<<"172.19.0.101:36163 -> 172.19.0.3:5672">>,rabbit_framing_amqp_0_9_1,{user,<<"guest">>,[administrator],[{rabbit_auth_backend_internal,none}]},<<"/">>,[{<<"authentication_failure_close">>,bool,true},{<<"publisher_confirms">>,bool,true},{<<"consumer_cancel_notify">>,bool,true},{<<"exchange_exchange_bindings">>,bool,true},{<<"basic.nack">>,bool,true},{<<"connection.blocked">>,bool,true}],<0.19576.0>,<0.19581.0>]}},{restart_type,intrinsic},{shutdown,70000},{child_type,worker}]

Python example of the plugin usage

I am trying to use the plugin with python pika but I am seeing some issues. When trying to utilize the "Exchange Level Duplication", I am getting the following error:

(venv) xyz@xyz-VirtualBox:~/rabbitmq_play$ python rmq_prod.py
Traceback (most recent call last):
  File "rmq_prod.py", line 14, in <module>
    channel.exchange_declare(exchange='hello', exchange_type='x-message-deduplication', arguments=x)
  File "/home/xyz/venv/lib/python3.5/site-packages/pika/adapters/blocking_connection.py", line 2302, in exchange_declare
    self._flush_output(declare_ok_result.is_ready)
  File "/home/xyz/venv/lib/python3.5/site-packages/pika/adapters/blocking_connection.py", line 1250, in _flush_output
    *waiters)
  File "/home/xyz/venv/lib/python3.5/site-packages/pika/adapters/blocking_connection.py", line 455, in _flush_output
    self._impl.ioloop.poll()
  File "/home/xyz/venv/lib/python3.5/site-packages/pika/adapters/select_connection.py", line 245, in poll
    self._poller.poll()
  File "/home/xyz/venv/lib/python3.5/site-packages/pika/adapters/select_connection.py", line 935, in poll
    self._dispatch_fd_events(fd_event_map)
  File "/home/xyz/venv/lib/python3.5/site-packages/pika/adapters/select_connection.py", line 625, in _dispatch_fd_events
    handler(fileno, events)
  File "/home/xyz/venv/lib/python3.5/site-packages/pika/adapters/base_connection.py", line 395, in _handle_events
    self._handle_read()
  File "/home/xyz/venv/lib/python3.5/site-packages/pika/adapters/base_connection.py", line 449, in _handle_read
    self._on_data_available(data)
  File "/home/xyz/venv/lib/python3.5/site-packages/pika/connection.py", line 1938, in _on_data_available
    self._process_frame(frame_value)
  File "/home/xyz/venv/lib/python3.5/site-packages/pika/connection.py", line 2059, in _process_frame
    if self._process_callbacks(frame_value):
  File "/home/xyz/venv/lib/python3.5/site-packages/pika/connection.py", line 2040, in _process_callbacks
    frame_value)  # Args
  File "/home/xyz/venv/lib/python3.5/site-packages/pika/callback.py", line 60, in wrapper
    return function(*tuple(args), **kwargs)
  File "/home/xyz/venv/lib/python3.5/site-packages/pika/callback.py", line 92, in wrapper
    return function(*args, **kwargs)
  File "/home/xyz/venv/lib/python3.5/site-packages/pika/callback.py", line 236, in process
    callback(*args, **keywords)
  File "/home/xyz/venv/lib/python3.5/site-packages/pika/adapters/blocking_connection.py", line 1316, in _on_channel_closed
    method.reply_text)
pika.exceptions.ChannelClosed: (406, "PRECONDITION_FAILED - Missing or invalid argument,           'x-cache-size' must be an integer greater than 0")
(venv) xyz@xyz-VirtualBox:~/rabbitmq_play$

The log in the /var/log/rabbitmq/ is the following:

2018-06-05 09:54:16.887 [info] <0.2696.0> accepting AMQP connection <0.2696.0> (127.0.0.1:53538 -> 127.0.0.1:5672)
2018-06-05 09:54:16.903 [info] <0.2696.0> connection <0.2696.0> (127.0.0.1:53538 -> 127.0.0.1:5672): user 'gordn' authenticated and granted access to vhost '/'
2018-06-05 09:54:16.910 [error] <0.2705.0> Channel error on connection <0.2696.0> (127.0.0.1:53538 -> 127.0.0.1:5672, vhost: '/', user: 'gordn'), channel 1:
operation exchange.declare caused a channel exception precondition_failed: Missing or invalid argument,           'x-cache-size' must be an integer greater than 0
2018-06-05 09:54:17.055 [warning] <0.2696.0> closing AMQP connection <0.2696.0> (127.0.0.1:53538 -> 127.0.0.1:5672, vhost: '/', user: 'gordn'):
client unexpectedly closed TCP connection

My code is the following (tried to integrate with example from https://www.rabbitmq.com/tutorials/tutorial-two-python.html):

import pika
import sys

credentials = pika.PlainCredentials('user1', 'user1')
#parameters = pika.ConnectionParameters(credentials=credentials

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

x = dict({'x-cache-size':5})

channel.exchange_declare(exchange='hello', exchange_type='x-message-deduplication', arguments=x)

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='hello',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

I had to change the cache-size to a string from integer: dict({'x-cache-size':'5'}) but I still get the error:

root@xyz-VirtualBox:/var/log/rabbitmq# cat test.txt 
2018-06-05 09:49:00.750 [info] <0.2676.0> accepting AMQP connection <0.2676.0> (127.0.0.1:53536 -> 127.0.0.1:5672)
2018-06-05 09:49:00.768 [info] <0.2676.0> connection <0.2676.0> (127.0.0.1:53536 -> 127.0.0.1:5672): user 'gordn' authenticated and granted access to vhost '/'
2018-06-05 09:49:00.788 [error] <0.2685.0> ** Generic server <0.2685.0> terminating
** Last message in was {'$gen_cast',{method,{'exchange.declare',0,<<"hello">>,<<"x-message-deduplication">>,false,false,false,false,false,[{<<"x-cache-size">>,longstr,<<"5">>}]},none,noflow}}
** When Server state == {ch,running,rabbit_framing_amqp_0_9_1,1,<0.2676.0>,<0.2683.0>,<0.2676.0>,<<"127.0.0.1:53536 -> 127.0.0.1:5672">>,{lstate,<0.2684.0>,false},none,1,{[],[]},{user,<<"gordn">>,[administrator],[{rabbit_auth_backend_internal,none}]},<<"/">>,<<>>,#{},{state,{dict,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},erlang},#{},#{},{set,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},<0.2677.0>,{state,none,5000,undefined},false,1,{{0,nil},{0,nil}},[],[],{{0,nil},{0,nil}},[{<<"consumer_cancel_notify">>,bool,true},{<<"authentication_failure_close">>,bool,true},{<<"basic.nack">>,bool,true},{<<"publisher_confirms">>,bool,true},{<<"connection.blocked">>,bool,true}],none,0,none,flow,[]}
** Reason for termination == 
** {{error,{noproc,{'Elixir.GenServer',call,['Elixir.RabbitMQ.MessageDeduplicationPlugin.Supervisor',{start_child,{{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Cache',start_link,[cache_exchange__hello,[{size,5},{ttl,nil},{persistence,memory}]]},permanent,5000,worker,['Elixir.RabbitMQ.MessageDeduplicationPlugin.Cache']}},infinity]}}},[{rabbit_misc,execute_mnesia_transaction,1,[{file,"src/rabbit_misc.erl"},{line,558}]},{rabbit_misc,execute_mnesia_transaction,2,[{file,"src/rabbit_misc.erl"},{line,568}]},{rabbit_channel,handle_method,5,[{file,"src/rabbit_channel.erl"},{line,2281}]},{rabbit_channel,handle_method,3,[{file,"src/rabbit_channel.erl"},{line,1367}]},{rabbit_channel,handle_cast,2,[{file,"src/rabbit_channel.erl"},{line,523}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1050}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]}
2018-06-05 09:49:00.790 [error] <0.2676.0> Error on AMQP connection <0.2676.0> (127.0.0.1:53536 -> 127.0.0.1:5672, vhost: '/', user: 'gordn', state: running), channel 1:
 {{error,
     {noproc,
         {'Elixir.GenServer',call,
             ['Elixir.RabbitMQ.MessageDeduplicationPlugin.Supervisor',
              {start_child,
                  {{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Cache',
                       start_link,
                       [cache_exchange__hello,
                        [{size,5},{ttl,nil},{persistence,memory}]]},
                   permanent,5000,worker,
                   ['Elixir.RabbitMQ.MessageDeduplicationPlugin.Cache']}},
              infinity]}}},
 [{rabbit_misc,execute_mnesia_transaction,1,
      [{file,"src/rabbit_misc.erl"},{line,558}]},
  {rabbit_misc,execute_mnesia_transaction,2,
      [{file,"src/rabbit_misc.erl"},{line,568}]},
  {rabbit_channel,handle_method,5,
      [{file,"src/rabbit_channel.erl"},{line,2281}]},
  {rabbit_channel,handle_method,3,
      [{file,"src/rabbit_channel.erl"},{line,1367}]},
  {rabbit_channel,handle_cast,2,[{file,"src/rabbit_channel.erl"},{line,523}]},
  {gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1050}]},
  {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]}
2018-06-05 09:49:00.793 [error] <0.2685.0> CRASH REPORT Process <0.2685.0> with 0 neighbours exited with reason: {error,{noproc,{'Elixir.GenServer',call,['Elixir.RabbitMQ.MessageDeduplicationPlugin.Supervisor',{start_child,{{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Cache',start_link,[cache_exchange__hello,[{size,5},{ttl,nil},{persistence,memory}]]},permanent,5000,worker,['Elixir.RabbitMQ.MessageDeduplicationPlugin.Cache']}},infinity]}}} in rabbit_misc:execute_mnesia_transaction/1 line 558 in gen_server2:terminate/3 line 1166
2018-06-05 09:49:00.796 [warning] <0.2676.0> Non-AMQP exit reason '{{error,{noproc,{'Elixir.GenServer',call,['Elixir.RabbitMQ.MessageDeduplicationPlugin.Supervisor',{start_child,{{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Cache',start_link,[cache_exchange__hello,[{size,5},{ttl,nil},{persistence,memory}]]},permanent,5000,worker,['Elixir.RabbitMQ.MessageDeduplicationPlugin.Cache']}},infinity]}}},[{rabbit_misc,execute_mnesia_transaction,1,[{file,"src/rabbit_misc.erl"},{line,558}]},{rabbit_misc,execute_mnesia_transaction,2,[{file,"src/rabbit_misc.erl"},{line,568}]},{rabbit_channel,handle_method,5,[{file,"src/rabbit_channel.erl"},{line,2281}]},{rabbit_channel,handle_method,3,[{file,"src/rabbit_channel.erl"},{line,1367}]},{rabbit_channel,handle_cast,2,[{file,"src/rabbit_channel.erl"},{line,523}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1050}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]}'
2018-06-05 09:49:00.807 [error] <0.2682.0> Supervisor {<0.2682.0>,rabbit_channel_sup} had child channel started with rabbit_channel:start_link(1, <0.2676.0>, <0.2683.0>, <0.2676.0>, <<"127.0.0.1:53536 -> 127.0.0.1:5672">>, rabbit_framing_amqp_0_9_1, {user,<<"gordn">>,[administrator],[{rabbit_auth_backend_internal,none}]}, <<"/">>, [{<<"consumer_cancel_notify">>,bool,true},{<<"authentication_failure_close">>,bool,true},{<<"basi...">>,...},...], <0.2677.0>, <0.2684.0>) at <0.2685.0> exit with reason {error,{noproc,{'Elixir.GenServer',call,['Elixir.RabbitMQ.MessageDeduplicationPlugin.Supervisor',{start_child,{{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Cache',start_link,[cache_exchange__hello,[{size,5},{ttl,nil},{persistence,memory}]]},permanent,5000,worker,['Elixir.RabbitMQ.MessageDeduplicationPlugin.Cache']}},infinity]}}} in rabbit_misc:execute_mnesia_transaction/1 line 558 in context child_terminated
2018-06-05 09:49:00.822 [info] <0.2676.0> closing AMQP connection <0.2676.0> (127.0.0.1:53536 -> 127.0.0.1:5672, vhost: '/', user: 'gordn')
2018-06-05 09:49:00.835 [error] <0.2682.0> Supervisor {<0.2682.0>,rabbit_channel_sup} had child channel started with rabbit_channel:start_link(1, <0.2676.0>, <0.2683.0>, <0.2676.0>, <<"127.0.0.1:53536 -> 127.0.0.1:5672">>, rabbit_framing_amqp_0_9_1, {user,<<"gordn">>,[administrator],[{rabbit_auth_backend_internal,none}]}, <<"/">>, [{<<"consumer_cancel_notify">>,bool,true},{<<"authentication_failure_close">>,bool,true},{<<"basi...">>,...},...], <0.2677.0>, <0.2684.0>) at <0.2685.0> exit with reason reached_max_restart_intensity in context shutdown
root@xyz-VirtualBox:/var/log/rabbitmq# 

Any reason for this? Something I am doing wrong?
I haven't tried this in other languages but anticipate the same.

Cannot ack a message from deduplicated queue

Seen in version 0.40 of the deduplication plugin, running in rabbitmq 3.7.4.
I want to deduplicate at queue level, so I created a direct exchange and bound queue to it with a routing key and the "x-message-deduplication" argument.
Publishing went fine, duplicated messages were filtered out.
However when I activated a subscriber to the queue the messages showed up but could not be acked and eventually the queue ended up in status crashed.
Also when I try to "get" a message through the management gui with Ack mode "ack message requeue false" the same thing happens.

Could you please look into this (or tell me what I did wrong in the setup)?

Beneath is the logging I took from the terminal window where my rabbitmq was running in a docker-compose setup.
rabbitmq | 2019-05-31 12:32:09.190 [error] <0.664.0> Restarting crashed queue 'q_tmbo_trigger' in vhost '/'.
rabbitmq | 2019-05-31 12:32:09.190 [error] <0.623.0> ** Generic server <0.623.0> terminating
rabbitmq | ** Last message in was {basic_get,<0.662.0>,true,<0.661.0>}
rabbitmq | ** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},none,false,rabbit_priority_queue,{passthrough,'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{3,{[{msg_status,3,<<78,86,250,11,140,232,135,45,211,172,106,145,123,226,196,255>>,{basic_message,{resource,<<"/">>,exchange,<<"x_gpl_sync">>},[<<"tmbo_trigger">>],{content,60,none,<<248,0,16,97,112,112,108,105,99,97,116,105,111,110,47,106,115,111,110,5,85,84,70,45,56,0,0,0,88,10,95,95,84,121,112,101,73,100,95,95,83,0,0,0,40,111,114,103,46,98,111,105,112,46,103,112,108,46,99,111,109,109,111,110,115,46,97,109,113,112,46,84,114,105,103,103,101,114,77,101,115,115,97,103,101,22,120,45,100,101,100,117,112,108,105,99,97,116,105,111,110,45,104,101,97,100,101,114,108,0,0,0,0,2,20,51,166,2,0>>,rabbit_framing_amqp_0_9_1,[<<"{"dossierId":34878374,"dossierType":"TM"}">>]},<<78,86,250,11,140,232,135,45,211,172,106,145,123,226,196,255>>,true},true,true,false,true,queue_index,{message_properties,undefined,false,41}}],[{msg_status,1,<<49,5,177,104,246,24,135,19,180,254,189,164,194,190,125,38>>,{basic_message,{resource,<<"/">>,exchange,<<"x_gpl_sync">>},[<<"tmbo_trigger">>],{content,60,none,<<248,0,16,97,112,112,108,105,99,97,116,105,111,110,47,106,115,111,110,5,85,84,70,45,56,0,0,0,88,10,95,95,84,121,112,101,73,100,95,95,83,0,0,0,40,111,114,103,46,98,111,105,112,46,103,112,108,46,99,111,109,109,111,110,115,46,97,109,113,112,46,84,114,105,103,103,101,114,77,101,115,115,97,103,101,22,120,45,100,101,100,117,112,...>>,...},...},...},...]}},...}}},...}
rabbitmq | ** Reason for termination ==
rabbitmq | ** {function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',maybe_delete_cache_entry,[{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},232321],[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,426}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,242}]},{rabbit_priority_queue,fetch,2,[{file,"src/rabbit_priority_queue.erl"},{line,299}]},{rabbit_amqqueue_process,fetch,2,[{file,"src/rabbit_amqqueue_process.erl"},{line,761}]},{rabbit_amqqueue_process,handle_call,3,[{file,"src/rabbit_amqqueue_process.erl"},{line,1171}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1029}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]}
rabbitmq | 2019-05-31 12:32:09.190 [error] <0.623.0> CRASH REPORT Process <0.623.0> with 0 neighbours exited with reason: no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':maybe_delete_cache_entry({amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplica...">>,...}],...}, 232321) line 426 in gen_server2:terminate/3 line 1166
rabbitmq | 2019-05-31 12:32:09.191 [error] <0.361.0> Supervisor {<0.361.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplica...">>,...}],...}, recovery, <0.360.0>) at <0.623.0> exit with reason no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':maybe_delete_cache_entry({amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplica...">>,...}],...}, 232321) line 426 in context child_terminated
rabbitmq | 2019-05-31 12:32:09.191 [error] <0.662.0> ** Generic server <0.662.0> terminating
rabbitmq | ** Last message in was {'$gen_cast',{method,{'basic.get',0,<<"q_tmbo_trigger">>,true},none,noflow}}
rabbitmq | ** When Server state == {ch,running,rabbit_framing_amqp_0_9_1,1,<0.659.0>,<0.659.0>,<0.652.0>,<<"[email protected]">>,{lstate,<0.661.0>,false},none,1,{[],[]},{user,<<"admin">>,[administrator],[{rabbit_auth_backend_internal,none}]},<<"/">>,<<>>,#{},{state,{dict,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},erlang},#{},#{},{set,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},<0.656.0>,{state,fine,5000,#Ref<0.581001283.1405616132.118530>},false,1,{{0,nil},{0,nil}},[],[],{{0,nil},{0,nil}},[{<<"publisher_confirms">>,bool,true},{<<"exchange_exchange_bindings">>,bool,true},{<<"basic.nack">>,bool,true},{<<"consumer_cancel_notify">>,bool,true},{<<"connection.blocked">>,bool,true},{<<"authentication_failure_close">>,bool,true}],none,0,none,flow,[]}
rabbitmq | ** Reason for termination ==
rabbitmq | ** {{{function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',maybe_delete_cache_entry,[{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},232321],[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,426}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,242}]},{rabbit_priority_queue,fetch,2,[{file,"src/rabbit_priority_queue.erl"},{line,299}]},{rabbit_amqqueue_process,fetch,2,[{file,"src/rabbit_amqqueue_process.erl"},{line,761}]},{rabbit_amqqueue_process,handle_call,3,[{file,"src/rabbit_amqqueue_process.erl"},{line,1171}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1029}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]},{gen_server2,call,[<0.623.0>,{basic_get,<0.662.0>,true,<0.661.0>},infinity]}},[{gen_server2,call,3,[{file,"src/gen_server2.erl"},{line,329}]},{rabbit_misc,with_exit_handler,2,[{file,"src/rabbit_misc.erl"},{line,508}]},{rabbit_channel,handle_method,3,[{file,"src/rabbit_channel.erl"},{line,1139}]},{rabbit_channel,handle_cast,2,[{file,"src/rabbit_channel.erl"},{line,523}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1050}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]}
rabbitmq | 2019-05-31 12:32:09.191 [error] <0.662.0> CRASH REPORT Process <0.662.0> with 1 neighbours exited with reason: {{function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',maybe_delete_cache_entry,[{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},232321],[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,426}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,242}]},...]},...} in gen_server2:call/3 line 329 in gen_server2:terminate/3 line 1166
rabbitmq | 2019-05-31 12:32:09.192 [error] <0.660.0> Supervisor {<0.660.0>,rabbit_channel_sup} had child channel started with rabbit_channel:start_link(1, <0.659.0>, <0.659.0>, <0.652.0>, <<"[email protected]">>, rabbit_framing_amqp_0_9_1, {user,<<"admin">>,[administrator],[{rabbit_auth_backend_internal,none}]}, <<"/">>, [{<<"publisher_confirms">>,bool,true},{<<"exchange_exchange_bindings">>,bool,true},{<<"basic.nack">>,...},...], <0.656.0>, <0.661.0>) at <0.662.0> exit with reason {{function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',maybe_delete_cache_entry,[{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},232321],[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,426}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,242}]},...]},...} in gen_server2:call/3 line 329 in context child_terminated
rabbitmq | 2019-05-31 12:32:09.192 [error] <0.660.0> Supervisor {<0.660.0>,rabbit_channel_sup} had child channel started with rabbit_channel:start_link(1, <0.659.0>, <0.659.0>, <0.652.0>, <<"[email protected]">>, rabbit_framing_amqp_0_9_1, {user,<<"admin">>,[administrator],[{rabbit_auth_backend_internal,none}]}, <<"/">>, [{<<"publisher_confirms">>,bool,true},{<<"exchange_exchange_bindings">>,bool,true},{<<"basic.nack">>,...},...], <0.656.0>, <0.661.0>) at <0.662.0> exit with reason reached_max_restart_intensity in context shutdown
rabbitmq | 2019-05-31 12:32:09.193 [error] <0.657.0> Supervisor {<0.657.0>,amqp_channel_sup} had child channel started with amqp_channel:start_link(direct, <0.652.0>, 1, <0.658.0>, {<<"[email protected]">>,1}) at <0.659.0> exit with reason {{function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',maybe_delete_cache_entry,[{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},232321],[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,426}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,242}]},...]},...} in gen_server2:call/3 line 329 in context child_terminated
rabbitmq | 2019-05-31 12:32:09.193 [error] <0.657.0> Supervisor {<0.657.0>,amqp_channel_sup} had child channel started with amqp_channel:start_link(direct, <0.652.0>, 1, <0.658.0>, {<<"[email protected]">>,1}) at <0.659.0> exit with reason reached_max_restart_intensity in context shutdown
rabbitmq | 2019-05-31 12:32:09.193 [warning] <0.652.0> Connection (<0.652.0>) closing: internal error in channel (<0.659.0>): {{{function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',maybe_delete_cache_entry,[{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},232321],[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,426}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,242}]},{rabbit_priority_queue,fetch,2,[{file,"src/rabbit_priority_queue.erl"},{line,299}]},{rabbit_amqqueue_process,fetch,2,[{file,"src/rabbit_amqqueue_process.erl"},{line,761}]},{rabbit_amqqueue_process,handle_call,3,[{file,"src/rabbit_amqqueue_process.erl"},{line,1171}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1029}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]},{gen_server2,call,[<0.623.0>,{basic_get,<0.662.0>,true,<0.661.0>},infinity]}},[{gen_server2,call,3,[{file,"src/gen_server2.erl"},{line,329}]},{rabbit_misc,with_exit_handler,2,[{file,"src/rabbit_misc.erl"},{line,508}]},{rabbit_channel,handle_method,3,[{file,"src/rabbit_channel.erl"},{line,1139}]},{rabbit_channel,handle_cast,2,[{file,"src/rabbit_channel.erl"},{line,523}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1050}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]}
rabbitmq | 2019-05-31 12:32:09.193 [error] <0.649.0> CRASH REPORT Process <0.649.0> with 0 neighbours exited with reason: {{{{function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',maybe_delete_cache_entry,[{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},232321],[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,426}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,...}]},...]},...},...},...} in gen_server:call/3 line 214 in gen_server:call/3 line 214
rabbitmq | 2019-05-31 12:32:09.194 [error] <0.644.0> Ranch listener rabbit_web_dispatch_sup_15672, connection process <0.644.0>, stream 5 had its request process <0.649.0> exit with reason {{{{function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',maybe_delete_cache_entry,[{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},232321],[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,426}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,242}]},{rabbit_priority_queue,fetch,2,[{file,"src/rabbit_priority_queue.erl"},{line,299}]},{rabbit_amqqueue_process,fetch,2,[{file,"src/rabbit_amqqueue_process.erl"},{line,761}]},{rabbit_amqqueue_process,handle_call,3,[{file,"src/rabbit_amqqueue_process.erl"},{line,1171}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1029}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]},{gen_server2,call,[<0.623.0>,{basic_get,<0.662.0>,true,<0.661.0>},infinity]}},[{gen_server2,call,3,[{file,"src/gen_server2.erl"},{line,329}]},{rabbit_misc,with_exit_handler,2,[{file,"src/rabbit_misc.erl"},{line,508}]},{rabbit_channel,handle_method,3,[{file,"src/rabbit_channel.erl"},{line,1139}]},{rabbit_channel,handle_cast,2,[{file,"src/rabbit_channel.erl"},{line,523}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1050}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]},{gen_server,call,[<0.659.0>,{call,{'basic.get',0,<<"q_tmbo_trigger">>,true},none,<0.649.0>},60000]}} and stacktrace [{gen_server,call,3,[{file,"gen_server.erl"},{line,214}]},{rabbit_mgmt_wm_queue_get,basic_get,5,[{file,"src/rabbit_mgmt_wm_queue_get.erl"},{line,130}]},{rabbit_mgmt_wm_queue_get,basic_gets,6,[{file,"src/rabbit_mgmt_wm_queue_get.erl"},{line,89}]},{rabbit_mgmt_wm_queue_get,'-do_it/2-fun-0-',8,[{file,"src/rabbit_mgmt_wm_queue_get.erl"},{line,75}]},{rabbit_mgmt_util,with_channel,5,[{file,"src/rabbit_mgmt_util.erl"},{line,878}]},{rabbit_mgmt_util,with_decode,5,[{file,"src/rabbit_mgmt_util.erl"},{line,721}]},{rabbit_mgmt_wm_queue_get,accept_content,2,[{file,"src/rabbit_mgmt_wm_queue_get.erl"},{line,51}]},{cowboy_rest,call,3,[{file,"src/cowboy_rest.erl"},{line,1182}]}]

x-cache-ttl ignored

We have a node application where we want to use this module, however we cannot make it work with TTL. It de duplicate the headers, but does not matter what TTL we set, it ignores it.

Here is our code:

import { connect, Connection } from 'amqplib';
import { createHmac } from 'crypto';
import { Inject } from '@nestjs/common';
import { Observable } from 'rxjs';
import { sleep } from 'shared';

export class Queue {

  private static connection: Connection;

  get connect(): Promise<void> {
    return new Promise<void>(async resolve => {
      while (!Queue.connection) {
        await sleep(500);
      }

      resolve();
    });
  }

  constructor(
  ) {
    connect(`amqp://${process.env.RABBITMQ_USERNAME}:${process.env.RABBITMQ_PASSWORD}@${process.env.RABBITMQ_HOST}`).then((conn) => {
      Queue.connection = conn;
    }).catch(error => {
    });
  }

  public async publish(queue: string, message: any, deDupHeader?: string | undefined): Promise<void> {
    const signature = createHmac('sha256', 'mySecretKey')
      .update(JSON.stringify(message))
      .digest('base64');

    const messageObj: Message = {
      message: message,
      signature,
    };

    return new Promise<void>(resolve => {
      Queue.connection.createChannel().then(async ch => {
        const exchangeName = `exchange_${queue}`;
        const exchange = await ch.assertExchange(
          exchangeName,
          'x-message-deduplication',
          {
            arguments: {
              'x-cache-size': '100',
              'x-cache-ttl': '1000',
            },
          },
        );
        await ch.assertQueue(queue, {
          durable: false,
        });
        await ch.bindQueue(queue, exchangeName, '*');
        await ch.publish(exchangeName, '', Buffer.from(JSON.stringify(messageObj)), {
          headers: {
            'x-deduplication-header': deDupHeader,
            'x-cache-ttl': 1000,
          },
        });
        // ch.sendToQueue(
        //   queue,
        //   Buffer.from(JSON.stringify(messageObj)),
        //   {
        //     persistent: true,
        //     headers: {
        //       'x-deduplication-header': deDupHeader,
        //       'x-cache-ttl': 5000,
        //     },
        //   },
        // );
        resolve();
      });
    });
  }

  public subscribe(queue: string): Observable<any> {
    return new Observable<any>(observer => {
      Queue.connection.createChannel().then(ch => {
        ch.deleteQueue(queue);
        ch.assertQueue(queue, { durable: false, arguments: { 'x-message-deduplication': true, 'x-cache-ttl': 5000 } });
        ch.consume(queue, message => {
          const content = message!.content.toString();
          const messageObject: Message = JSON.parse(content);
          const calculatedSignatureFromMessage = createHmac('sha256', 'mySecretKey')
            .update(JSON.stringify(messageObject.message))
            .digest('base64');

          if (messageObject.signature !== calculatedSignatureFromMessage) {
            // We do not forward this message, because the
            // signature does not match
            return;
          }

          observer.next(messageObject);
        });
      });
    });
  }
}

interface Message {
  message: any;
  signature: string;
}

Missing elixir.ez in release

Hey :)

Nice plugin but I was forced to compile it by myself. You missed to upload the elixir.ez to the release which is a dependency ;)

{"init terminating in do_boot",{error,{missing_dependencies,[elixir],[rabbitmq_message_deduplication_exchange]}}}
init terminating in do_boot ({error,{missing_dependencies,[elixir],[rabbitmq_message_deduplication_exchange]}})

What about High Avaibility? Does this plugin handle this case?

Thank you so much
Tobias

Error loading module 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue'

Hello,
We are using RabbitMQ 3.7.6 and Erlang 20.3.8. It is a cluster with 3 nodes with federation. Installed Deduplication plugin rabbitmq_message_deduplication-v3.7.x_0.4.4 from https://github.com/noxdafox/rabbitmq-message-deduplication/releases and also elixir-1.8.2.ez
I am able to create a Deduplication exchange. But we do see the below three error messages in logs constantly.

lager_event - [error] emulator beam/beam_load.c(1863): Error loading module 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':

lager_event - [error] <0.18556.644> Channel error on connection <0.5251.632> (IPaddress:58938 -> Ipaddress:5672, vhost: 'xxx-x', user: 'xxx'), channel 2:

lager_event - To fix this, please recompile this module with an 20 compiler.

Can you help to resolve these errors?

Queue level duplication: support for quorum and mirror queues

Hello!
Thank you for this plugin, it exactly what we need and it works great!

But I've found an issue or maybe I'm doing something wrong.

There is a case when tasks with same header are in the queue and it looks like it has something to do with requeue messages.

Steps to reproduce:

  1. Add message to the queue
  2. Got to RabbitMQ Web Ui and get messages

obraz

  1. Add the same message
  2. Got to RabbitMQ Web Ui and get messages

obraz

The x-deduplication-header is just a mda5 hash of message body

So am I doing something wrong or this is an issue?

Thanks in advance!
Maciej

Queue (**without deduplication**) crashing when attempting to use dead letter exchange

This is with 0.3.4-erl-20.

I have an exchange which uses deduplication, but I also have a few exchanges without it.

I setup a policy for a queue which is bound to a normal exchange. Such as:

$ sudo rabbitmqctl set_policy pol "results" '{"dead-letter-exchange":"dead"}' --priority 1 --apply-to queues

This queue is not related to a deduplication exchange and has no deduplication attributes of its own. However, when nacking a message from this queue, it crashes with this error:

 [error] <0.2245.0> Restarting crashed queue 'results' in vhost '/'.
[error] <0.2033.0> CRASH REPORT Process <0.2033.0> with 0 neighbours exited with reason: no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':fetchwhile(#Fun<rabbit_amqqueue_process.36.129944743>, #Fun<rabbit_amqqueue_process.42.129944743>, [], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[],<0.2033.0>,[],[],[],...},...}) line 225 in gen_server2:terminate/3 line 1166

Any ideas what is going on? I've even deleted any deduplication exchanges/queues and the same crash happens every time a message should be routed to dead letter, whether set through policy or when declaring the queue (using arguments x-dead-letter-exchange for eg).

Concerns with performance and message confirmations.

regarding the performance issue:
rabbitmq-message-deduplication use x-deduplication-header to validate if an item exist in the queue.
So rabbitmq-message-deduplication uses a full search in the queue to find that existing item? I am expecting a big performance issue when we have a millions of items in the queue or do we have a benchmark regarding this issue? if we don't have yet a benchmark I'll do it and share the results here.

with message confirmation:
rabbitmq uses confirmation when producing an item to queue to make sure the message is successfully delivered. So what would happen when the connection fails and the message is already in the queue and the broker didn't received the confirmation because of connection failure. Will it still accept the new item or will just disregard? and what would happened to unconfirmed message in the queue.

docker-compose - install plugin

Hi. Thanks for great plugin.

Can somebody point me out, what am I missing here. This is my docker compose file:

services:
  rabbitmq:
    image: rabbitmq:3.7
    container_name: rabbitmq
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      - RABBITMQ_DEFAULT_USER=rabbit
      - RABBITMQ_DEFAULT_PASS=rabbit
      - RABBITMQ_ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins
      - PLUGINS_DIR=/usr/lib/rabbitmq/plugins:/opt/rabbitmq/plugins/
    volumes:
    - ./enabled_plugins:/etc/rabbitmq/enabled_plugins
    - ./plugins:/usr/lib/rabbitmq/plugins

Plugins are in .plugins directory, and enabled_plugins has the:

[rabbitmq_management, rabbitmq_message_deduplication].
Message deduplication plugin is just not installed after running this service.
Plugins are accessible inside container and enabled plugins file is on right path.

Missleading reason for mandantory messages

Hi,
first of all, let me thank you for your great work with this plugin. It really helps a lot in our daily work.
We are sending mandantory messages, with a return callback. So in case a message was not routed, it does not "disappear". However, in case a message is not routed because of a duplication, i can not see a reason in the callback that lets me discriminate between a message that could not be routed because of a wrong routing key, or a message that was not routed because of deduplication. It is always 312, NO_ROUTE. In this case it would really help, to have at least a hint in the callback, why the message was not delivered.

Thanks a lot in advance.

Dependency on elixir missing?

Hi,

I tried creating a Docker container with the following Dockerfile:

FROM rabbitmq:3.8-alpine

RUN set -x \
    && wget https://github.com/noxdafox/rabbitmq-message-deduplication/releases/download/0.4.3/rabbitmq_message_deduplication-v3.8.x-0.4.3.ez -P $RABBITMQ_HOME/plugins/ \
    && rabbitmq-plugins enable --offline rabbitmq_management rabbitmq_message_deduplication

EXPOSE 15671 15672

When starting a container, I get the following error:

BOOT FAILED
===========

Error description:
    rabbit:start_it/1 line 477
    rabbit:boot_error/2 line 1069
    rabbit_lager:log_locations/0 line 105
    rabbit_lager:ensure_lager_configured/0 line 206
    rabbit_lager:lager_configured/0 line 214
    lager:list_all_sinks/0 line 345
    lager_config:get/2 line 71
    ets:lookup(lager_config, {'_global',handlers})
throw:{error,{missing_dependencies,[elixir],[rabbitmq_message_deduplication]}}
Log file(s) (may contain more information):
   <stdout>

{"init terminating in do_boot",{error,{missing_dependencies,[elixir],[rabbitmq_message_deduplication]}}}
init terminating in do_boot ({error,{missing_dependencies,[elixir],[rabbitmq_message_deduplication]}})

Crash dump is being written to: /var/log/rabbitmq/erl_crash.dump...done

Do I need to install this elixir dependency first? How? I found this issue, but this does already seem to be resolved.

Thank you!

Support for Older Version? v3.6

Is there anyway to make this tool work with v3.6? If not, id there any Deduplication plugin out there you can recommend that supports V3.6?

Thanks for your work

Can't buid on Ubuntu 18.04 LTS

When I build I get this error:

make[1]:Leaving directory '/home/dunglt/Downloads/rabbitmq-message-deduplication-0.4.2/deps/rabbit'
mix make_all
Compiling 5 files (.ex)

== Compilation error in file lib/common.ex ==
** (ArgumentError) no record amqqueue found at /home/dunglt/Downloads/rabbitmq-message-deduplication-0.4.2/_build/dev/lib/rabbit_common/include/rabbit.hrl. Or the record does not exist or its entry is malformed or depends on other include files
(elixir) lib/record/extractor.ex:56: Record.Extractor.extract_record/2
lib/common.ex:25: (module)
(stdlib) erl_eval.erl:680: :erl_eval.do_apply/6
Makefile:13: recipe for target 'app' failed
make: *** [app] Error 1

Erlang version:
Erlang/OTP 22 [erts-10.5] [source] [64-bit] [smp:6:6] [ds:6:6:10] [async-threads:1]
Eshell V10.5 (abort with ^G)

Can You please describe how you do the development of rabbitMQ plugin ?

Hi Noxdafox
Can You please describe how you do the development of rabbitMQ plugin ?

I have tried quite a bit, but keep failing. Especially the umbrella project keep telling me that it doesn't have lager, ranch, ranch_proxy_protocol blah blah blah projects inside the deps/ folder

even though I have followed the official rabbitMQ plugin development website "make co" didn't work well.

https://www.rabbitmq.com/blog/2013/06/03/using-elixir-to-write-rabbitmq-plugins/ <--- this blog is too old i think for development of plugin using elixir. Can You give some hint how to develop plugin using elixir. I kinda spent some time to learn basics of elixir, but now I'm thinking should i just give up and go for erlang ???

Please Help :)

Allow re-scheduling of processing (unacked) items.

Hi! First of all, want to say thank you for such a great plugin :)

My question is related to duplicated message processing. Currently, the plugin does not allow to add duplicated message to the queue at all. And I want to be able to add duplicated message to the queue when some message with same x-deduplication-header already exists, but only in case that some consumer already processing this message.

So, assume we have messages with x-deduplication-header = A, B, C, D, and messages C and D are already processing by some consumers.

Let's try to add a new message with x-deduplication-header = C.

Current behavior: the message will be ignored.

Desired behavior: the message will be queued (even additional message with same x-deduplication-header is already in the queue).

Does this possible?

Thanks in advance!

Rejected with requeue=false messages are never removed from queue cache

Hi,

I was experimenting with version 0.4.3 on RabbitMQ 3.8.2 and I noticed the following.

When I reject a message with requeue set to false (which forwards the message to the dead-letter exchange if such exchange is configured), it is impossible to requeue a message with the same x-deduplication-header header at a later moment. It seems that the message is never removed from the deduplication cache.

This can be easily demonstrated in the management interface: get the message with the reject requeue false option in the drop-down box (last option). Then, publish a message with the same x-deduplication-header header. You will now see that the message will not enter the queue.

Is this a bug? This could lead to a memory leak as well?

For your information, my use case is as following: I am requeuing a message when I detect during consumption of the queue bound to the dead-letter exchange that the message was rejected due to a transient exception.

Thank you.

The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=503, text='COMMAND_INVALID - unknown exchange type 'x-message-deduplication

Getting below error when I try to declare exchange as de-duplicate

  • $exception {"The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=503, text='COMMAND_INVALID - unknown exchange type 'x-message-deduplication'', classId=40, methodId=10"} RabbitMQ.Client.Exceptions.OperationInterruptedException

` Dictionary<string, object> dd = new Dictionary<string, object>();
dd["x-cache-size"] = 1000;
dd["x-cache-ttl"] = 100000;
dd["x-cache-persistence"] = "memory";

                var exchangeType = ExchangeType.Topic;
                channel.ExchangeDeclare(ExchangeName, "x-message-deduplication", false, true, dd);

                channel.QueueDeclare(queue: QName,
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                channel.QueueBind(QName, ExchangeName, QName, null);`

Support Request: Queue level deduplication

Could you please help me on queue level deduplication as in my case, it does not work as I expect it to do.

My setup

I am using RabbitMQ 3.7.8 with Erlang 20.3.4 and message deduplication plugin 0.3.5

I have created an exchange, queue and binding:

$RABBITMQADMIN declare queue \
    name=myqueue \
    durable=true \
    arguments="{\"x-message-deduplication\":true}"

$RABBITMQADMIN declare exchange \
    name=myexchange \
    type=topic
	
$RABBITMQADMIN declare binding \
    source=myexchange \
    destination=myqueue \
    destination_type=queue \
    routing_key=#

This gives me a queue with active deduplication.

Things that work as expected

When I go to the RabbitMQ web interface, open up myexchange and publish a message with x-deduplication-header=test I see in the exchange, that the message has been accepted and routed to the queue; additionally in the queue, I see that the message arrived ("Message rates" graph) and the queue contains a message.

When I publish another message with x-deduplication-header=test in myexchange, I again see in the exchange, that the message was accepted and routed to the queue. In the queue, I see that the message arrived ("Message rates" graph), but as the message contained the same deduplication key, there is still only one message in the queue.

When I publish a message with x-deduplication-header=another-test in myexchange, I see in the exchange, that the message was accepted and routed to the queue. In the queue, I see that the message arrived ("Message rates" graph) and was added to the queue.

From the web interface, I can consume both events and see the test and another-test deduplication key headers.

Things that dont' work as expected

In this state, when I now again publish a message with x-deduplication-header=test in myexchange, I again see in the exchange, that the message was accepted and routed to the queue. In the queue, I see that the message arrived ("Message rates" graph), but the message is not enqueued.

The README says:

Each message containing the x-deduplication-header header will not be enqueued if another message with the same header is already present within the queue.

From this, I would expect, that the message is now enqueued, as there are no messages in the queue. Instead, it seems to me, as if the behaviour of the exchange level deduplication has been applied and the deduplication algorithm checks, if there has ever been a message with this deduplication key.

Additionally, If I restart RabbitMQ and publish a message with x-deduplication-header=test, the message is enqueued.

Unable to initiate plugin config via AMQP node.js client

First I would like to thank you for the great plugin, that will solve an important issue with duplicated messages within our Rabbitmq server.

I was able to configure an exchange via the Rabbitmq cli and everything seems fine there. But for our containerized server, we're using the obsolete node-amqp client, which probably changes somehow the following initialization of the exchange config:

mqConnection.exchange( "eventX", {  
    "type":"x-message-deduplication", 
    "arguments": { 
        "x-cache-size":100
    }}, function (ex) {            

   var eventExchange = ex;     
         mqConnection.queue("event-queue", function (q) {
              q.bind(eventExchange, "event.*");
              q.subscribe(createEvent);
        })
    });

I get the following error within Rabbitmq which makes me think that the client stringifies the config:

operation exchange.declare caused a channel exception precondition_failed: Missing or invalid argument, 'x-cache-size' must be an integer greater than 0\n"

The first option is to change the client with another, which will be very difficult because of the many microservices, which are using it.
The second option, which I chose is the recompiling your plugin with parsing to integer the "x-cache-size" field. The compilation of version 0.2.0 passed fine, but then I get the following error when I start RabbitMQ:

Error:
{:plugins_not_found, [:rabbitmq_message_deduplication_exchange]}

PS. I would like to inform you that I get the plugin activation error even with your latest build with version 0.2.0

Kind Regards
Stoyan

Cheers!

Discussion/feature request - configurable queue deduplication semantics

Hi,

I would like to discuss something regarding queue deduplication semantics, in the hope to get some interest for extending this plugin.

One could define two definitions for queue deduplication:

  1. do not enqueue a message if an equivalent message is already present in the queue
  2. do not enqueue a message if an equivalent message is already present and not being processed in the queue

From my understanding in #51, this plugin currently implements the first definition, being the most intuitive and safe/correct definition.

For the project I am working on, it would be useful if we could support the second definition as well. (Please let me know if you want me to elaborate on this. In short, I am using RabbitMQ to notify workers that tasks have been added to the database, available to be executed. Here, it is only safe to prevent duplicates under the second definition, as we cannot know for sure that an ongoing consumer will see all additional tasks being added during its execution loop.)

Is this something that could be implemented in this plugin? We could define the deduplication definition to be applied via some argument property when creating the queue?

Note: It's important to document that we can end up with "real" duplicates in the queue. For example, assume we are preventing duplicates under the second definition. Assume a certain consumer, with manual acknowledgement enabled, is processing message X while we enqueue another message X. Under the second definition, this is allowed. If now the consumer crashes or rejects message X, it is given back to the queue. At this point, we have two X messages.

Thank you.

Question - are unacked messages considered as duplicate candidate?

Hi,

I have a question regarding queue deduplication. Are messages which are currently being processed (so they are in unacked status) a candidate for a duplicate, or are they ignored?

I would assume that such unacked messages are taken into account, but I wanted to make sure.

Thank you.

Bind x-message-deduplication to amq.direct

Hello, thank you for the plugin but I ran into the following problem.
I bind x-message-deduplication exchange to default amq.direct exchange:

image

and when I try to publish message to deduplication exchange I get following error:

image

Could you clarify does it support rabbitmq-message-deduplication plugin exchange to exchange binding? Also I try set routing key as # and * in binding.

Can't add same message after process it

When I process a message from the queue, I can't add a new one (the same message). After deleting message from the queue does not delete the entry from the cache?

RabbitMQ 3.8.1, Erlang 22.1.7, php-amqplib v2.11.0, rabbitmq-message-deduplication release 0.4.3

BTW: great job!

Docs questions

Thanks for this!

Can you clarify please "x-cache-size" expects what unit? what text format?
Also about the type "x-message-deduplication" what kind of exchange is it actually? is it a fanout exchange? a topic exchange? other - I need to understand the expected topology

I would open a PR with the needed docs once I understand them

Plugin causing Queue to crash after first publish

I set deduplication to run on Exchange Level and pass through x-deduplication-header in my message headers

Exchange
screen shot 2018-07-06 at 12 52 08

Queue
screen shot 2018-07-06 at 13 06 18

After first publish both the Source and destination Queues consumers drop
screen shot 2018-07-06 at 12 58 45

and i get this error log on my rabbit container

2018-07-06 10:41:22.424 [error] <0.7275.0> Restarting crashed queue 'elastic2' in vhost '/'.
2018-07-06 10:41:22.424 [error] <0.7174.0> ** Generic server <0.7174.0> terminating
** Last message in was {'$gen_cast',{ack,[0],<0.7195.0>}}
** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"elastic2">>},true,false,none,[],<0.7174.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"user">>}},none,true,rabbit_priority_queue,{passthrough,'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"elastic2">>},true,false,none,[],<0.7174.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"user">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},1,{0,nil},{0,nil},{1,{0,{msg_status,0,<<154,135,39,254,185,253,12,224,1,234,81,133,225,49,39,201>>,{basic_message,{resource,<<"/">>,exchange,<<"elastic.dd">>},[<<"com.fullfacing.tcs-payments.replaceIndex">>],{content,60,{'P_basic',<<"application/json">>,<<"snappy">>,[{<<"user.ip">>,void,undefined},{<<"user.agent">>,void,undefined},{<<"dst">>,longstr,<<"tcs-payments">>},{<<"src">>,longstr,<<"tcs-payments">>},{<<"user.token">>,void,undefined},{<<"event">>,longstr,<<"request">>},{<<"status">>,longstr,<<"pending">>}],undefined,undefined,<<"5a6177cf-9849-4821-aa5f-9451ad220a0d">>,<<"tcs-payments-f06b3170-88e0-46ba-9234-8a66c0e4c688">>,undefined,<<"1fc99452-57df-419e-ace5-f637cd2e7b9b">>,0,undefined,undefined,undefined,undefined},<<230,192,16,97,112,112,108,105,99,97,116,105,111,110,47,106,115,111,110,6,115,110,97,112,112,121,0,0,0,112,7,117,115,101,114,46,105,112,86,10,117,115,101,114,46,97,103,101,110,116,86,3,100,115,116,83,0,0,0,12,116,99,115,45,112,97,121,109,101,110,116,115,3,115,114,99,83,0,0,0,12,116,99,115,45,112,97,121,109,101,110,116,115,10,117,115,101,114,46,116,111,107,101,110,86,5,101,118,101,110,116,83,0,0,0,7,114,101,113,117,101,115,116,6,115,116,97,116,117,115,83,0,0,0,7,112,101,110,100,105,110,103,36,53,97,54,49,55,55,99,102,45,57,56,52,57,45,52,56,50,49,45,97,97,53,102,45,57,52,53,49,97,100,50,50,48,97,48,100,49,116,99,115,45,112,97,121,109,101,110,116,115,45,102,...>>,...},...},...},...}},...}}},...}
** Reason for termination == 
** {function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue','-ack/2-fun-0-',[0],[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,244}]},{'Elixir.Enum','-map/2-lists^map/1-0-',2,[{file,"lib/enum.ex"},{line,1294}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',ack,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,244}]},{rabbit_priority_queue,ack,2,[{file,"src/rabbit_priority_queue.erl"},{line,319}]},{rabbit_amqqueue_process,'-ack/3-fun-0-',2,[{file,"src/rabbit_amqqueue_process.erl"},{line,775}]},{rabbit_amqqueue_process,handle_cast,2,[{file,"src/rabbit_amqqueue_process.erl"},{line,1346}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1050}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]}
2018-07-06T10:41:22.432486781Z 2018-07-06 10:41:22.425 [error] <0.7174.0> CRASH REPORT Process <0.7174.0> with 0 neighbours exited with reason: no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':'-ack/2-fun-0-'(0) line 244 in gen_server2:terminate/3 line 1166
2018-07-06 10:41:22.425 [error] <0.7173.0> Supervisor {<0.7173.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"elastic2">>},true,false,none,[],none,[],[],[],undefined,undefined,...}, declare, <0.7172.0>) at <0.7174.0> exit with reason no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':'-ack/2-fun-0-'(0) line 244 in context child_terminated
2018-07-06 10:43:10.183 [error] <0.7332.0> Restarting crashed queue 'tcs-payments-f06b3170-88e0-46ba-9234-8a66c0e4c688' in vhost '/'.
2018-07-06 10:43:10.183 [error] <0.7217.0> ** Generic server <0.7217.0> terminating

RabbitMQ Version: 3.7.6
rabbitmq-message-deduplication Version: 0.3.0

Any help would be appreciated!

[Feature request] Add extra ttl argument for counter reset after last deduplicated message

I would like to propose a new feature for the ttl of the message deduplication. The use case is the following:
If we have a flow of duplicated messages for a random period, and we define a constant ttl period of 10 seconds, on the 11th second we will receive again a duplicated message. In this order I want to propose an argument within exchange config or in the headers for resetting of the ttl counter. Then Every duplicated message with such header will reset the counter and we can cover a longer period of duplicated messages.

I hope this request makes sense for you.

Best Regards,
Stoyan

Crash when trying to delete queue with `x-message-deduplication`

Using RabbitMQ 3.7.7 with Erlang 20.3.8.3 (although the same happens with Erlang 21), running rabbitmq-message-deduplication 0.3.2.

Tried to create and delete normal queues first, no errors.

Log of the exchange:

2018-08-10 07:56:41.283 [info] <0.5.0> Server startup complete; 4 plugins started.
 * rabbitmq_management
 * rabbitmq_management_agent
 * rabbitmq_message_deduplication
 * rabbitmq_web_dispatch
2018-08-10 07:57:47.801 [info] <0.726.0> accepting AMQP connection <0.726.0> (172.23.0.1:54410 -> 172.23.0.2:5672)
2018-08-10 07:57:47.808 [info] <0.726.0> connection <0.726.0> (172.23.0.1:54410 -> 172.23.0.2:5672): user 'guest' authenticated and granted access to vhost '/'
2018-08-10 07:57:47.852 [info] <0.726.0> closing AMQP connection <0.726.0>(172.23.0.1:54410 -> 172.23.0.2:5672, vhost: '/', user: 'guest')
2018-08-10 07:57:59.905 [error] <0.807.0> Restarting crashed queue 'test' in vhost '/'.
2018-08-10 07:57:59.905 [error] <0.738.0> ** Generic server <0.738.0> terminating
** Last message in was {delete,false,false,<<"guest">>}
** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"test">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.738.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"guest">>}},none,false,rabbit_priority_queue,{passthrough,'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"test">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.738.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"guest">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{1,{[],[{msg_status,0,<<227,251,167,177,83,83,174,59,68,201,74,85,204,199,104,128>>,{basic_message,{resource,<<"/">>,exchange,<<>>},[<<"test">>],{content,60,{'P_basic',undefined,undefined,[{<<"x-deduplication-header">>,longstr,<<"x">>}],undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined},<<32,0,0,0,0,29,22,120,45,100,101,100,117,112,108,105,99,97,116,105,111,110,45,104,101,97,100,101,114,83,0,0,0,1,120>>,rabbit_framing_amqp_0_9_1,[<<"a">>]},<<227,251,167,177,83,83,174,59,68,201,74,85,204,199,104,128>>,false},false,false,false,false,queue_index,{message_properties,undefined,false,1}}]}},1,{0,nil},{0,nil},...}}},...}
** Reason for termination ==
** {noproc,{'Elixir.GenServer',call,[cache_queue__test,{info,cache_queue__test},5000]}}
** In 'terminate' callback with reason ==
** normal
2018-08-10 07:57:59.905 [error] <0.738.0> CRASH REPORT Process <0.738.0> with 0 neighbours exited with reason: no such process or port in call to 'Elixir.GenServer':call(cache_queue__test, {info,cache_queue__test}, 5000) in gen_server2:handle_msg/2 line 1045
2018-08-10 07:57:59.906 [error] <0.737.0> Supervisor {<0.737.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"test">>},true,false,none,[{<<"x-message-deduplication">>,bool,...}],...}, declare,<0.736.0>) at <0.738.0> exit with reason no such process or port in call to 'Elixir.GenServer':call(cache_queue__test, {info,cache_queue__test}, 5000) in context child_terminated

No code was used, this was all done via the management interface.

Crash when creating a priority queue with `x-message-deduplication`

Using RabbitMQ 3.7.7 with Erlang 20.3.8.3 (although the same happens with Erlang 21), running rabbitmq-message-deduplication 0.3.2.

Personally, I don't need support for priority queues, just reporting this because I noticed the issue in a testing setup. ๐Ÿ˜ƒ

Code used:

const amqp = require('amqplib');

const test = async () => {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createConfirmChannel();
  await channel.assertQueue('foo', {
    arguments: {
      'x-message-deduplication': true,
    },
    maxPriority: 10,
  });
  await channel.close();
  await connection.close();
  process.exit(0);
};

test();

RabbitMQ log of the exchange:

2018-08-10 08:08:33.199 [info] <0.5.0> Server startup complete; 4 plugins started.
 * rabbitmq_management
 * rabbitmq_management_agent
 * rabbitmq_message_deduplication
 * rabbitmq_web_dispatch
2018-08-10 08:08:40.543 [info] <0.639.0> accepting AMQP connection <0.639.0> (172.23.0.1:54464 -> 172.23.0.2:5672)
2018-08-10 08:08:40.551 [info] <0.639.0> connection <0.639.0> (172.23.0.1:54464 -> 172.23.0.2:5672): user 'guest' authenticated and granted access to vhost '/'
2018-08-10 08:08:40.608 [error] <0.651.0> ** Generic server <0.651.0> terminating
** Last message in was {init,new}
** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"foo">>},true,false,none,[{<<"x-max-priority">>,byte,10},{<<"x-message-deduplication">>,bool,true}],<0.651.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"guest">>}},none,false,undefined,undefined,{state,{queue,[],[],0},{active,-576460738767161,1.0}},undefined,undefined,undefined,undefined,{state,fine,5000,undefined},{0,nil},undefined,undefined,undefined,{state,{dict,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},delegate},undefined,undefined,undefined,undefined,'drop-head',0,0,running}
** Reason for termination ==
** {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]}
2018-08-10 08:08:40.608 [error] <0.730.0> Restarting crashed queue 'foo' in vhost '/'.
2018-08-10 08:08:40.609 [error] <0.651.0> CRASH REPORT Process <0.651.0> with 0 neighbours exited with reason: {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]} in gen_server2:terminate/3 line 1166
2018-08-10 08:08:40.610 [error] <0.650.0> Supervisor {<0.650.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"foo">>},true,false,none,[{<<"x-max-priority">>,byte,10},{<<"...">>,...}],...}, declare, <0.649.0>) at <0.651.0> exit with reason {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]} in context child_terminated
2018-08-10 08:08:40.618 [info] <0.639.0> closing AMQP connection <0.639.0>(172.23.0.1:54464 -> 172.23.0.2:5672, vhost: '/', user: 'guest')
2018-08-10 08:08:40.627 [error] <0.730.0> ** Generic server <0.730.0> terminating
** Last message in was {'$gen_cast',init}
** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"foo">>},true,false,none,[{<<"x-max-priority">>,byte,10},{<<"x-message-deduplication">>,bool,true}],<0.730.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"guest">>}},none,false,undefined,undefined,{state,{queue,[],[],0},{active,-576460738718527,1.0}},undefined,undefined,undefined,undefined,{state,fine,5000,undefined},{0,nil},undefined,undefined,undefined,{state,{dict,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},delegate},undefined,undefined,undefined,undefined,'drop-head',0,0,running}
** Reason for termination ==
** {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]}
2018-08-10 08:08:40.627 [error] <0.730.0> CRASH REPORT Process <0.730.0> with 0 neighbours exited with reason: {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]} in gen_server2:terminate/3 line 1166
2018-08-10 08:08:40.627 [error] <0.735.0> Restarting crashed queue 'foo' in vhost '/'.
2018-08-10 08:08:40.628 [error] <0.650.0> Supervisor {<0.650.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"foo">>},true,false,none,[{<<"x-max-priority">>,byte,10},{<<"...">>,...}],...}, declare, <0.649.0>) at <0.730.0> exit with reason {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]} in context child_terminated
2018-08-10 08:08:40.637 [error] <0.735.0> ** Generic server <0.735.0> terminating
** Last message in was {'$gen_cast',init}
** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"foo">>},true,false,none,[{<<"x-max-priority">>,byte,10},{<<"x-message-deduplication">>,bool,true}],<0.735.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"guest">>}},none,false,undefined,undefined,{state,{queue,[],[],0},{active,-576460738699762,1.0}},undefined,undefined,undefined,undefined,{state,fine,5000,undefined},{0,nil},undefined,undefined,undefined,{state,{dict,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},delegate},undefined,undefined,undefined,undefined,'drop-head',0,0,running}
** Reason for termination ==
** {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]}
2018-08-10 08:08:40.638 [error] <0.737.0> Restarting crashed queue 'foo' in vhost '/'.
2018-08-10 08:08:40.637 [error] <0.735.0> CRASH REPORT Process <0.735.0> with 0 neighbours exited with reason: {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]} in gen_server2:terminate/3 line 1166
2018-08-10 08:08:40.639 [error] <0.650.0> Supervisor {<0.650.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"foo">>},true,false,none,[{<<"x-max-priority">>,byte,10},{<<"...">>,...}],...}, declare, <0.649.0>) at <0.735.0> exit with reason {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]} in context child_terminated
2018-08-10 08:08:40.659 [error] <0.737.0> ** Generic server <0.737.0> terminating
** Last message in was {'$gen_cast',init}
** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"foo">>},true,false,none,[{<<"x-max-priority">>,byte,10},{<<"x-message-deduplication">>,bool,true}],<0.737.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"guest">>}},none,false,undefined,undefined,{state,{queue,[],[],0},{active,-576460738688950,1.0}},undefined,undefined,undefined,undefined,{state,fine,5000,undefined},{0,nil},undefined,undefined,undefined,{state,{dict,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},delegate},undefined,undefined,undefined,undefined,'drop-head',0,0,running}
** Reason for termination ==
** {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]}
2018-08-10 08:08:40.659 [error] <0.737.0> CRASH REPORT Process <0.737.0> with 0 neighbours exited with reason: {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]} in gen_server2:terminate/3 line 1166
2018-08-10 08:08:40.660 [error] <0.650.0> Supervisor {<0.650.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"foo">>},true,false,none,[{<<"x-max-priority">>,byte,10},{<<"...">>,...}],...}, declare, <0.649.0>) at <0.737.0> exit with reason {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]} in context child_terminated
2018-08-10 08:08:40.659 [error] <0.739.0> Restarting crashed queue 'foo' in vhost '/'.
2018-08-10 08:08:40.681 [error] <0.739.0> ** Generic server <0.739.0> terminating
** Last message in was {'$gen_cast',init}
** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"foo">>},true,false,none,[{<<"x-max-priority">>,byte,10},{<<"x-message-deduplication">>,bool,true}],<0.739.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"guest">>}},none,false,undefined,undefined,{state,{queue,[],[],0},{active,-576460738666639,1.0}},undefined,undefined,undefined,undefined,{state,fine,5000,undefined},{0,nil},undefined,undefined,undefined,{state,{dict,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},delegate},undefined,undefined,undefined,undefined,'drop-head',0,0,running}
** Reason for termination ==
** {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]}
2018-08-10 08:08:40.681 [error] <0.739.0> CRASH REPORT Process <0.739.0> with 0 neighbours exited with reason: {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]} in gen_server2:terminate/3 line 1166
2018-08-10 08:08:40.681 [error] <0.741.0> Restarting crashed queue 'foo' in vhost '/'.
2018-08-10 08:08:40.682 [error] <0.650.0> Supervisor {<0.650.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"foo">>},true,false,none,[{<<"x-max-priority">>,byte,10},{<<"...">>,...}],...}, declare, <0.649.0>) at <0.739.0> exit with reason {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]} in context child_terminated
2018-08-10 08:08:40.695 [error] <0.741.0> ** Generic server <0.741.0> terminating
** Last message in was {'$gen_cast',init}
** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"foo">>},true,false,none,[{<<"x-max-priority">>,byte,10},{<<"x-message-deduplication">>,bool,true}],<0.741.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"guest">>}},none,false,undefined,undefined,{state,{queue,[],[],0},{active,-576460738644906,1.0}},undefined,undefined,undefined,undefined,{state,fine,5000,undefined},{0,nil},undefined,undefined,undefined,{state,{dict,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},delegate},undefined,undefined,undefined,undefined,'drop-head',0,0,running}
** Reason for termination ==
** {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]}
2018-08-10 08:08:40.695 [error] <0.741.0> CRASH REPORT Process <0.741.0> with 0 neighbours exited with reason: {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]} in gen_server2:terminate/3 line 1166
2018-08-10 08:08:40.697 [error] <0.650.0> Supervisor {<0.650.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"foo">>},true,false,none,[{<<"x-max-priority">>,byte,10},{<<"...">>,...}],...}, declare, <0.649.0>) at <0.741.0> exit with reason {[{bytes,2400},{entries,0}],[{bytes,2400},{entries,0}]} in context child_terminated
2018-08-10 08:08:40.697 [error] <0.650.0> Supervisor {<0.650.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"foo">>},true,false,none,[{<<"x-max-priority">>,byte,10},{<<"...">>,...}],...}, declare, <0.649.0>) at <0.741.0> exit with reason reached_max_restart_intensity in context shutdown

Can't make it work in C# project

I'm a RabbitMQ newbie; for a new project I'd like to use the deduplication plugin as it would perfectly fit my needs. I'm using AspNet Core 3.0 worker process and language is C#.

I've tried a very simple example, 2 publishers sending 10 messages numbered 1 to 10 and one consumer getting messages and acknowledging them.

I'm having quite strange and unpredictable results:

if I run the 3 workers (2 Publishers and one consumer) inside the same process, it looks like that deduplication plugin works fine and inserts in the queue only 10 unique messages, but the consumer reads only the first 2 and ackowledges only one of them.

if I run publishers and consumer in two different processes, the consumer gets all the 10 messages but after ack the messages remain in the queue and if I run again the consumer process they get reprocessed again.

my publisher code:

        int cnt = 1;
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            Dictionary<string, object> dd = new Dictionary<string, object>();
            dd["x-message-deduplication"] = true;
            channel.QueueDeclare(queue: qname,
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: dd);

            while (!stoppingToken.IsCancellationRequested)
            {
                var message = GetMessage(cnt);
                var body = Encoding.UTF8.GetBytes(message);
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;
                Dictionary<string, object> d = new Dictionary<string, object>();
                d["x-deduplication-header"] = cnt;
                properties.Headers = d;

                channel.BasicPublish(exchange: "",
                                     routingKey: qname,
                                     basicProperties: properties,
                                     body: body);
                Console.WriteLine(" [x] Sent {0}", message);

                logDB(cnt, "Sender"+Wname);
                cnt++;
                if (cnt > 10)
                    break;
                await Task.Delay(1000, stoppingToken);
            }

my consumer code:

while (!stoppingToken.IsCancellationRequested)
       {
           var factory = new ConnectionFactory() { HostName = "localhost" };
           using (var connection = factory.CreateConnection())
           using (var channel = connection.CreateModel())
           {
               Dictionary<string, object> dd = new Dictionary<string, object>();
               dd["x-message-deduplication"] = true;
               channel.QueueDeclare(queue: qname,
                                    durable: true,
                                    exclusive: false,
                                    autoDelete: false,
                                    arguments: dd);

               _logger.LogInformation("{0} Waiting for messages.", Cname);

               var consumer = new EventingBasicConsumer(channel);
               consumer.Received += (model, ea) =>
               {
                   var body = ea.Body;
                   var message = Encoding.UTF8.GetString(body);
                   _logger.LogInformation("{0} Received {1}", Cname, message);


                   string[] parts = message.Split('-');
                   int cntmsg = int.Parse(parts[1]);

                   logDB(cntmsg, Cname);

                   Thread.Sleep((cntmsg % 5) * 1000);


                   _logger.LogInformation("{0} Received {1} done", Cname, message);

                   channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);
               };
               channel.BasicConsume(queue: qname,
                                    autoAck: false,
                                    consumer: consumer);

               _logger.LogInformation("{0} After BasicConsume", Cname);

               while (true)
                   await Task.Delay(1000, stoppingToken);

           }

I have also tried to use deduplication at exchange level; I create the exchange this way:

              Dictionary<string, object> dd = new Dictionary<string, object>();
               dd["x-cache-size"] = 1000;
               dd["x-cache-ttl"] = 100000;
               dd["x-cache-persistence"] = "memory";
               channel.ExchangeDeclare("testex", "x-message-deduplication", false, true, dd);

               channel.QueueDeclare(queue: qname,
                                    durable: false,
                                    exclusive: false,
                                    autoDelete: false,
                                    arguments: null);
               channel.QueueBind(qname, "dedup", "tasks", null);

but on ExchangeDeclare I get this exception:

RabbitMQ.Client.Exceptions.OperationInterruptedException: 'The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text="PRECONDITION_FAILED - Missing or invalid argument, 'x-cache-size' must be an integer greater than 0", classId=40, methodId=10, cause='

could you please give me some hints on what I'm mistaking?
Roberto

Crash on Docker/Kubernetes

I understand this is way out of scope of this plugin, but I'd appreciate any pointers.

I have installed the plugin into an image which is used by a helm chart to deploy Rabbit to a Kubernetes cluster (ugh). The docker image is here:

https://github.com/bitnami/bitnami-docker-rabbitmq

The helm chart:

https://github.com/helm/charts/tree/master/stable/rabbitmq

Everything appears to working except the plugin (I think due to assumptions made about the nodes/connections).

Here is the crash report I'm receiving:

2019-02-23 11:16:11.463 [error] <0.1741.0> CRASH REPORT Process <0.1741.0> with 0 neighbours exited with reason: no such process or port in call to 'Elixir.GenServer':call(cache_exchange__bulk_uptime_checks_exchange, {info,cache_exchange__bulk_uptime_checks_exchange}, 5000) in 'Elixir.GenServer':call/3 line 914 in 'Elixir.GenServer':call/3 line 914

2019-02-23 11:16:11.464 [error] <0.1740.0> Ranch listener rabbit_web_dispatch_sup_15672, connection process <0.1740.0>, stream 1 had its request process <0.1741.0> exit with reason {noproc,{'Elixir.GenServer',call,[cache_exchange__bulk_uptime_checks_exchange,{info,cache_exchange__bulk_uptime_checks_exchange},5000]}} and stacktrace [{'Elixir.GenServer',call,3,[{file,"lib/gen_server.ex"},{line,914}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Exchange',info,2,[{file,"lib/rabbit_message_deduplication_exchange.ex"},{line,184}]},{rabbit_exchange,info,1,[{file,"src/rabbit_exchange.erl"},{line,362}]},{lists,map,2,[{file,"lists.erl"},{line,1239}]},{lists,map,2,[{file,"lists.erl"},{line,1239}]},{rabbit_mgmt_util,'-all_or_one_vhost/2-lc$^0/1-0-',2,[{file,"src/rabbit_mgmt_util.erl"},{line,918}]},{rabbit_mgmt_util,all_or_one_vhost,2,[{file,"src/rabbit_mgmt_util.erl"},{line,918}]},{rabbit_mgmt_wm_exchanges,resource_exists,2,[{file,"src/rabbit_mgmt_wm_exchanges.erl"},{line,43}]}]

I think the first is a looping cache process and the 2nd is when I attempt to access the web UI for exchanges (where I have a deduped exchange already declared).

Any suggestions on where to start with this?

Does the plugin rely on a particular RMQ config/installation?

Problems when using the plugin with `amqp.node`

I set up a sample project with amqp.node, code is as follows:

const amqp = require('amqplib');

const test = async () => {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createConfirmChannel();

  await channel.assertQueue('test', {
    arguments: {
      'x-message-deduplication': true,
    },
  });

  await Promise.all(
    'abcdef'.split('').map(payload => {
      console.log('sending', payload);
      return new Promise(resolve =>
        channel.sendToQueue(
          'test',
          Buffer.from(payload),
          {
            headers: {
              'x-deduplication-header': 'x',
            },
          },
          resolve
        )
      );
    })
  );

  await channel.close();
  await connection.close();
  process.exit(0);
};

test();

I expected to see one message, but all six of them are in the queue. After some investigating, I noticed that the x-message-deduplication ended up getting sent as a string:

features

If you believe this to be an issue with amqp.node I will report the issue there instead.

For reference, the RabbitMQ server is run in Docker, with the following Dockerfile:

FROM rabbitmq:3-management

# Install deduplication plugin:
# https://github.com/noxdafox/rabbitmq-message-deduplication
ADD https://github.com/noxdafox/rabbitmq-message-deduplication/releases/download/0.3.2/rabbitmq_message_deduplication-0.3.2.ez /plugins
ADD https://github.com/noxdafox/rabbitmq-message-deduplication/releases/download/0.3.2/elixir-1.6.6.ez /plugins
RUN chmod 644 /plugins/*
RUN rabbitmq-plugins enable rabbitmq_message_deduplication

Installing plugin

I am having installing plugin in my rabbitmq instance. I am using a ubuntu 16.04 (virtualbox x64). I updated my rabbitmq to 3.7.5.
I placed both plugins you made available in directory: /usr/lib/rabbitmq/lib/rabbitmq-server-3.7.5/plugins:

root@xyz-VirtualBox:/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.5/plugins# ls
amqp10_client-3.7.5.ez  lager-3.5.1.ez                        rabbitmq_consistent_hash_exchange-3.7.5.ez  rabbitmq_mqtt-3.7.5.ez                     rabbitmq_sharding-3.7.5.ez           rabbitmq_web_mqtt-3.7.5.ez
amqp10_common-3.7.5.ez  rabbit_common-3.7.5.ez                rabbitmq_event_exchange-3.7.5.ez            rabbitmq_peer_discovery_aws-3.7.5.ez       rabbitmq_shovel-3.7.5.ez             rabbitmq_web_mqtt_examples-3.7.5.ez
amqp_client-3.7.5.ez    rabbitmq_amqp1_0-3.7.5.ez             rabbitmq_federation-3.7.5.ez                rabbitmq_peer_discovery_common-3.7.5.ez    rabbitmq_shovel_management-3.7.5.ez  rabbitmq_web_stomp-3.7.5.ez
cowboy-2.2.2.ez         rabbitmq_auth_backend_cache-3.7.5.ez  rabbitmq_federation_management-3.7.5.ez     rabbitmq_peer_discovery_consul-3.7.5.ez    rabbitmq_stomp-3.7.5.ez              rabbitmq_web_stomp_examples-3.7.5.ez
cowlib-2.1.0.ez         rabbitmq_auth_backend_http-3.7.5.ez   rabbitmq_jms_topic_exchange-3.7.5.ez        rabbitmq_peer_discovery_etcd-3.7.5.ez      rabbitmq_top-3.7.5.ez                ranch-1.5.0.ez
elixir-1.6.5.ez         rabbitmq_auth_backend_ldap-3.7.5.ez   rabbitmq_management-3.7.5.ez                rabbitmq_peer_discovery_k8s-3.7.5.ez       rabbitmq_tracing-3.7.5.ez            ranch_proxy_protocol-1.5.0.ez
goldrush-0.1.9.ez       rabbitmq_auth_mechanism_ssl-3.7.5.ez  rabbitmq_management_agent-3.7.5.ez          rabbitmq_random_exchange-3.7.5.ez          rabbitmq_trust_store-3.7.5.ez        README
jsx-2.8.2.ez            rabbitmq_aws-3.7.5.ez                 rabbitmq_message_deduplication-3.7.5.ez     rabbitmq_recent_history_exchange-3.7.5.ez  rabbitmq_web_dispatch-3.7.5.ez       recon-2.3.2.ez
root@xyz-VirtualBox:/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.5/plugins# 

However when I do a 'enable', it can't seem to find it.

root@xyz-VirtualBox:/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.5/plugins# rabbitmq-plugins enable rabbitmq_message_deduplication
Error:
{:plugins_not_found, [:rabbitmq_message_deduplication]}
root@xyz-VirtualBox:/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.5/plugins# 

Any advice? Is it an issue with rabbit mq configuration?

Queue crash - no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue

I am fairly new with rabbit. I created docker compose definition for usage with this plugin. My rabbit clients are Java spring based. Queues are crashing with this in log:

2018-11-16 13:08:43.940 [error] <0.720.0> ** Generic server <0.720.0> terminating
** Last message in was {notify_down,<0.1131.0>}
** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"AE_DWH_MSG_QUEUE">>},true,false,none,[],<0.720.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"rabbit">>}},none,true,rabbit_priority_queue,{passthrough,'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"AE_DWH_MSG_QUEUE">>},true,false,none,[],<0.720.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"rabbit">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},34,{0,nil},{0,nil},{0,nil},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/8REB3IHJIIQRX3A7MAYHUUV95",{#{},[]},#Ref<0.150649687.76546052.200081>,0,32768,#Fun<rabbit_variable_queue.10.44586449>,#Fun<rabbit_variable_queue.11.44586449>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"AE_DWH_MSG_QUEUE">>}},{{client_msstate,<0.442.0>,<<158,199,174,229,250,253,161,174,235,239,58,246,83,51,208,171>>,#{},{state,#Ref<0.150649687.76677122.203032>,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent",<0.445.0>,#Ref<0.150649687.76677122.203033>,#Ref<0.150649687.76677122.203028>,#Ref<0.150649687.76677122.203034>,#Ref<0.150649687.76677122.203035>,{4000,800}},{client_msstate,<0.439.0>,<<140,113,5,230,157,99,158,190,37,191,116,227,118,140,164,72>>,#{},{state,#Ref<0.150649687.76677122.202990>,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient",<0.440.0>,#Ref<0.150649687.76677122.202991>,#Ref<0.150649687.76677122.202988>,#Ref<0.150649687.76677122.202992>,...}},...}}},...}
** Reason for termination == 
** {function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',requeue,[[],{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"AE_DWH_MSG_QUEUE">>},true,false,none,[],<0.720.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"rabbit">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},34,{0,nil},{0,nil},{0,nil},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/8REB3IHJIIQRX3A7MAYHUUV95",{#{},[]},#Ref<0.150649687.76546052.200081>,0,32768,#Fun<rabbit_variable_queue.10.44586449>,#Fun<rabbit_variable_queue.11.44586449>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"AE_DWH_MSG_QUEUE">>}},{{client_msstate,<0.442.0>,<<158,199,174,229,250,253,161,174,235,239,58,246,83,51,208,171>>,#{},{state,#Ref<0.150649687.76677122.203032>,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent",<0.445.0>,#Ref<0.150649687.76677122.203033>,#Ref<0.150649687.76677122.203028>,#Ref<0.150649687.76677122.203034>,#Ref<0.150649687.76677122.203035>,{4000,800}},{client_msstate,<0.439.0>,<<140,113,5,230,157,99,158,190,37,191,116,227,118,140,164,72>>,#{},{state,#Ref<0.150649687.76677122.202990>,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient",<0.440.0>,#Ref<0.150649687.76677122.202991>,#Ref<0.150649687.76677122.202988>,#Ref<0.150649687.76677122.202992>,#Ref<0.150649687.76677122.202993>,{4000,800}}},true,0,4096,0,0,0,0,0,0,infinity,0,0,0,0,0,0,{rates,0.10925883227177771,0.10925883227177771,...},...}}],...},...]}
2018-11-16 13:08:43.940 [error] <0.720.0> CRASH REPORT Process <0.720.0> with 0 neighbours exited with reason: no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':requeue([], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"AE_DWH_MSG_QUEUE">>},true,false,none,[],<0.720.0>,...},...}) line 279 in gen_server2:terminate/3 line 1166
2018-11-16 13:08:43.941 [error] <0.719.0> Supervisor {<0.719.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"AE_DWH_MSG_QUEUE">>},true,false,none,[],none,[],[],[],undefined,...}, declare, <0.718.0>) at <0.720.0> exit with reason no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':requeue([], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"AE_DWH_MSG_QUEUE">>},true,false,none,[],<0.720.0>,...},...}) line 279 in context child_terminated
2018-11-16 13:08:44.511 [error] <0.3826.0> Restarting crashed queue 'AE_INDEX_MSG_QUEUE' in vhost '/'.
2018-11-16 13:08:44.512 [error] <0.724.0> ** Generic server <0.724.0> terminating
** Last message in was {notify_down,<0.1139.0>}
** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"AE_INDEX_MSG_QUEUE">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.724.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"rabbit">>}},none,true,rabbit_priority_queue,{passthrough,'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"AE_INDEX_MSG_QUEUE">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.724.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"rabbit">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},17,{0,nil},{0,nil},{0,nil},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/5F3MVCTRCXIMKHY4JREHU2HYN",{#{},[]},#Ref<0.150649687.76546052.200073>,0,32768,#Fun<rabbit_variable_queue.10.44586449>,#Fun<rabbit_variable_queue.11.44586449>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"AE_INDEX_MSG_QUEUE">>}},{{client_msstate,<0.442.0>,<<207,120,181,254,114,182,252,111,178,72,18,61,99,246,119,242>>,#{},{state,#Ref<0.150649687.76677122.203032>,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent",<0.445.0>,#Ref<0.150649687.76677122.203033>,#Ref<0.150649687.76677122.203028>,#Ref<0.150649687.76677122.203034>,#Ref<0.150649687.76677122.203035>,{4000,800}},{client_msstate,<0.439.0>,<<49,241,41,236,241,15,199,190,32,177,162,113,67,71,213,131>>,#{},{state,#Ref<0.150649687.76677122.202990>,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient",<0.440.0>,...}},...}}},...}
** Reason for termination == 
** {function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',requeue,[[],{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"AE_INDEX_MSG_QUEUE">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.724.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"rabbit">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},17,{0,nil},{0,nil},{0,nil},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/5F3MVCTRCXIMKHY4JREHU2HYN",{#{},[]},#Ref<0.150649687.76546052.200073>,0,32768,#Fun<rabbit_variable_queue.10.44586449>,#Fun<rabbit_variable_queue.11.44586449>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"AE_INDEX_MSG_QUEUE">>}},{{client_msstate,<0.442.0>,<<207,120,181,254,114,182,252,111,178,72,18,61,99,246,119,242>>,#{},{state,#Ref<0.150649687.76677122.203032>,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent",<0.445.0>,#Ref<0.150649687.76677122.203033>,#Ref<0.150649687.76677122.203028>,#Ref<0.150649687.76677122.203034>,#Ref<0.150649687.76677122.203035>,{4000,800}},{client_msstate,<0.439.0>,<<49,241,41,236,241,15,199,190,32,177,162,113,67,71,213,131>>,#{},{state,#Ref<0.150649687.76677122.202990>,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient",<0.440.0>,#Ref<0.150649687.76677122.202991>,#Ref<0.150649687.76677122.202988>,#Ref<0.150649687.76677122.202992>,#Ref<0.150649687.76677122.202993>,{4000,800}}},true,0,4096,0,0,0,0,0,0,infinity,0,0,0,0,0,0,{rates,0.012360780792940037,...},...}}],...},...]}
2018-11-16 13:08:44.512 [error] <0.724.0> CRASH REPORT Process <0.724.0> with 0 neighbours exited with reason: no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':requeue([], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"AE_INDEX_MSG_QUEUE">>},true,false,none,[{<<"x-mess...">>,...}],...},...}) line 279 in gen_server2:terminate/3 line 1166
2018-11-16 13:08:44.512 [error] <0.723.0> Supervisor {<0.723.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"AE_INDEX_MSG_QUEUE">>},true,false,none,[{<<"x-message-dedup...">>,...}],...}, declare, <0.722.0>) at <0.724.0> exit with reason no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':requeue([], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"AE_INDEX_MSG_QUEUE">>},true,false,none,[{<<"x-mess...">>,...}],...},...}) line 279 in context child_terminated
2018-11-16 13:08:44.894 [error] <0.735.0> ** Generic server <0.735.0> terminating
** Last message in was {notify_down,<0.1175.0>}
** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"AE_ATOMIC_INDEX_MSG_QUEUE">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.735.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"rabbit">>}},none,true,rabbit_priority_queue,{passthrough,'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"AE_ATOMIC_INDEX_MSG_QUEUE">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.735.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"rabbit">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},60025,{0,nil},{0,nil},{0,nil},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/98H4U8R2WZXQ3PISSNX9RR679",{#{},[]},#Ref<0.150649687.76546049.200696>,0,32768,#Fun<rabbit_variable_queue.10.44586449>,#Fun<rabbit_variable_queue.11.44586449>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"AE_ATOMIC_INDEX_MSG_QUEUE">>}},{{client_msstate,<0.442.0>,<<2,47,88,2,189,190,207,229,166,231,15,106,159,156,32,169>>,#{},{state,#Ref<0.150649687.76677122.203032>,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent",<0.445.0>,#Ref<0.150649687.76677122.203033>,#Ref<0.150649687.76677122.203028>,#Ref<0.150649687.76677122.203034>,#Ref<0.150649687.76677122.203035>,{4000,800}},{client_msstate,<0.439.0>,<<254,201,152,255,229,144,88,112,220,235,119,179,67,119,87,26>>,#{},{state,#Ref<0.150649687.76677122.202990>,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_sto...",...}},...}}},...}
** Reason for termination == 
** {function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',requeue,[[],{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"AE_ATOMIC_INDEX_MSG_QUEUE">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.735.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"rabbit">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},60025,{0,nil},{0,nil},{0,nil},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/98H4U8R2WZXQ3PISSNX9RR679",{#{},[]},#Ref<0.150649687.76546049.200696>,0,32768,#Fun<rabbit_variable_queue.10.44586449>,#Fun<rabbit_variable_queue.11.44586449>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"AE_ATOMIC_INDEX_MSG_QUEUE">>}},{{client_msstate,<0.442.0>,<<2,47,88,2,189,190,207,229,166,231,15,106,159,156,32,169>>,#{},{state,#Ref<0.150649687.76677122.203032>,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent",<0.445.0>,#Ref<0.150649687.76677122.203033>,#Ref<0.150649687.76677122.203028>,#Ref<0.150649687.76677122.203034>,#Ref<0.150649687.76677122.203035>,{4000,800}},{client_msstate,<0.439.0>,<<254,201,152,255,229,144,88,112,220,235,119,179,67,119,87,26>>,#{},{state,#Ref<0.150649687.76677122.202990>,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@d5f44ed6c7ed/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient",<0.440.0>,#Ref<0.150649687.76677122.202991>,#Ref<0.150649687.76677122.202988>,#Ref<0.150649687.76677122.202992>,#Ref<0.150649687.76677122.202993>,{4000,800}}},true,0,4096,0,0,0,0,0,0,infinity,0,0,0,0,...}}],...},...]}
2018-11-16 13:08:44.895 [error] <0.735.0> CRASH REPORT Process <0.735.0> with 0 neighbours exited with reason: no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':requeue([], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"AE_ATOMIC_INDEX_MSG_QUEUE">>},true,false,none,[{<<"...">>,...}],...},...}) line 279 in gen_server2:terminate/3 line 1166
2018-11-16 13:08:44.895 [error] <0.3849.0> Restarting crashed queue 'AE_ATOMIC_INDEX_MSG_QUEUE' in vhost '/'.
2018-11-16 13:08:44.895 [error] <0.734.0> Supervisor {<0.734.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"AE_ATOMIC_INDEX_MSG_QUEUE">>},true,false,none,[{<<"x-messag...">>,...}],...}, declare, <0.733.0>) at <0.735.0> exit with reason no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':requeue([], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"AE_ATOMIC_INDEX_MSG_QUEUE">>},true,false,none,[{<<"...">>,...}],...},...}) line 279 in context child_terminated

RabbitMQ crashes when publishing message using Queue level deduplication

RabbitMQ version: 3.7.7
rabbitmq-message-deduplicate version: 0.3.4
kumbo version: 4.2.1

sample implementation using queue level deduplication

sample_exchange = Exchange("updated_user", "direct", durable=True)
sample_queue = Queue("updated_user", exchange=sample_exchange, 
                     routing_key="user.update",
                     queue_arguments={"x-message-deduplication": True})

producer.publish(message, declare=[sample_queue], retry=True,
                 headers={"x-deduplication-header": generate_uuid()})

traceback we received when publishing message using Queue level deduplication

Traceback (most recent call last):
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/flask/app.py", line 2309, in __call__
    return self.wsgi_app(environ, start_response)
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/flask/app.py", line 2295, in wsgi_app
    response = self.handle_exception(e)
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/flask/app.py", line 1741, in handle_exception
    reraise(exc_type, exc_value, tb)
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/flask/_compat.py", line 35, in reraise
    raise value
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/flask/app.py", line 2292, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/flask/app.py", line 1815, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/flask/app.py", line 1718, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/flask/_compat.py", line 35, in reraise
    raise value
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/flask/app.py", line 1813, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/flask/app.py", line 1799, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/ryan/Workspace/playground/python-rabbitmq/app.py", line 13, in publish
    producer.publish(request.json)
  File "/home/ryan/Workspace/playground/python-rabbitmq/kumbo_app/producer.py", line 29, in publish
    headers={"x-deduplication-header": generate_uuid()})
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/kombu/connection.py", line 494, in _ensured
    return fun(*args, **kwargs)
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/kombu/messaging.py", line 194, in _publish
    [maybe_declare(entity) for entity in declare]
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/kombu/messaging.py", line 194, in <listcomp>
    [maybe_declare(entity) for entity in declare]
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/kombu/messaging.py", line 102, in maybe_declare
    return maybe_declare(entity, self.channel, retry, **retry_policy)
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/kombu/common.py", line 129, in maybe_declare
    return _maybe_declare(entity, declared, ident, channel, orig)
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/kombu/common.py", line 135, in _maybe_declare
    entity.declare(channel=channel)
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/kombu/entity.py", line 605, in declare
    self._create_queue(nowait=nowait, channel=channel)
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/kombu/entity.py", line 614, in _create_queue
    self.queue_declare(nowait=nowait, passive=False, channel=channel)
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/kombu/entity.py", line 649, in queue_declare
    nowait=nowait,
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/amqp/channel.py", line 1149, in queue_declare
    spec.Queue.DeclareOk, returns_tuple=True,
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/amqp/abstract_channel.py", line 79, in wait
    self.connection.drain_events(timeout=timeout)
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/amqp/connection.py", line 491, in drain_events
    while not self.blocking_read(timeout):
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/amqp/connection.py", line 497, in blocking_read
    return self.on_inbound_frame(frame)
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/amqp/method_framing.py", line 55, in on_frame
    callback(channel, method_sig, buf, None)
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/amqp/connection.py", line 501, in on_inbound_method
    method_sig, payload, content,
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/amqp/abstract_channel.py", line 128, in dispatch_method
    listener(*args)
  File "/home/ryan/Workspace/playground/python-rabbitmq/venv/lib/python3.5/site-packages/amqp/connection.py", line 623, in _on_close
    (class_id, method_id), ConnectionError)
amqp.exceptions.InternalError: (0, 0): (541) INTERNAL_ERROR

Queue level de-duplication is not working.

Hi,
I have a producer and consumer ( both are c# apps) as two different processes. I have hardcoded the dedup header to Test and consumer is not running when publisher is publishing the messages. I publish 10 messages with same header and when I run subscriber, I see all messages. Not sure what am I doing wrong. can you help me?

Here is my publisher

private async Task Publish(string qname, CancellationToken stoppingToken)
       {
           int cnt = 1;
           var factory = new ConnectionFactory() { HostName = "localhost" };
           using (var connection = factory.CreateConnection())
           using (var channel = connection.CreateModel())
           {
               Dictionary<string, object> dd = new Dictionary<string, object>();
               dd["x-message-deduplication"] = true;
               channel.QueueDeclare(queue: qname,
                                    durable: true,
                                    exclusive: false,
                                    autoDelete: false,
                                    arguments: dd);

               while (true)
               {
                   var message = $"Message {cnt}";
                   var body = Encoding.UTF8.GetBytes(message);
                   var properties = channel.CreateBasicProperties();
                   properties.Persistent = true;
                   Dictionary<string, object> d = new Dictionary<string, object>();
                   d["x-deduplication-header"] = "Test";
                   properties.Headers = d;

                   channel.BasicPublish(exchange: "",
                                        routingKey: qname,
                                        basicProperties: properties,
                                        body: body);
                   Console.WriteLine(" [x] Sent {0}", message);

                   cnt++;
                   if (cnt > 10)
                       break;
                   await Task.Delay(1000, stoppingToken);
               }
           }
       }  

here is my subscriber:

private async Task Subscribe(string qname)
        {
            while (true)
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    Dictionary<string, object> dd = new Dictionary<string, object>();
                    dd["x-message-deduplication"] = true;
                    channel.QueueDeclare(queue: qname,
                                         durable: true,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: dd);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine(" Received {0}" , message);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);
                    };
                    channel.BasicConsume(queue: qname,
                                         autoAck: false,
                                         consumer: consumer);

              
                    while (true)
                        await Task.Delay(1000);

                }
            }
        }

Reset cache on message acknowledgement

Is there any way to reset the cache for a given message on acknowledgement? This is my problem:

  1. User sends message ABC with a TTL of 10 minutes
  2. Worker A starts a process based on message ABC
  3. 10 minutes later a new message comes, but the process started in worker A is still running. So now worker B picks the message and starts the same process, which results in errors.

If we have acknowledgement, we could set a bigger TTL and then on acknowledgement reset the cache.

Thank you for this awesome tool.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.