Git Product home page Git Product logo

rabbitmq-lvc-exchange's Introduction

Last value caching exchange

This is a pretty simple implementation of a last value cache using RabbitMQ's pluggable exchange types feature.

The last value cache is intended to solve problems like the following: say I am using messaging to send notifications of some changing values to clients; now, when a new client connects, it won't know the value until it changes.

The last value exchange acts like a direct exchange (binding keys are compared for equality with routing keys); but, it also keeps track of the last value that was published with each routing key, and when a queue is bound, it automatically enqueues the last value for the binding key.

Supported RabbitMQ Versions

The most recent release of this plugin targets RabbitMQ 3.12.x.

Supported Erlang/OTP Versions

Latest version of this plugin requires Erlang 25.0 or later versions, same as RabbitMQ 3.12.x.

Installation

Binary builds of this plugin from the Community Plugins page.

See Plugin Installation for details about how to install plugins that do not ship with RabbitMQ.

Building from Source

You can build and install it like any other plugin (see the plugin development guide).

Usage

To use the LVC exchange, with e.g., py-amqp:

import amqplib.client_0_8 as amqp
ch = amqp.Connection().channel()
ch.exchange_declare("lvc", type="x-lvc")
ch.basic_publish(amqp.Message("value"),
                 exchange="lvc", routing_key="rabbit")
ch.queue_declare("q")
ch.queue_bind("q", "lvc", "rabbit")
print ch.basic_get("q").body

Limitations

"Recent value cache"

Message publishing in AMQP 0-9-1 is asynchronous by design and thus introduces natural race conditions when there's more than one publisher. It is quite possible to see different last-values but the same subsequent message stream, from different clients.

This won't matter if you simply want to have a value to show until you get an update. If it does matter, consider e.g. using sequence IDs so you can notice out-of-order messages.

There's also a race in the pluggable exchanges hook, so that clients can "see" the binding before the hook has been run; for the LVC, this means that there's a possiblity that messages will get queued before the last value. For this reason, I'm thinking of tagging the last value messages so that clients can fast-forward to it, or ignore it, if necessary.

Values v. deltas

One question that springs to mind when considering last value caches is "what if I'm sending deltas rather than the whole value?". The LVC exchange doesn't address this use case, but you could do it by using two exchanges and posting full values to the LVC (from the originating process -- presumably you'd be using deltas to save on downstream bandwidth).

Direct exchanges only

The semantics of another kind of value-caching exchange (other than fanout) aren't obvious. To choose one option though, say a newly-bound queue was to be given all values that match its binding key -- this would require every supported exchange type to supply a reverse routing match procedure.

Creating a Release

  1. Update broker_version_requirements in helpers.bzl & Makefile (Optional)
  2. Update the plugin version in MODULE.bazel
  3. Push a tag (i.e. v3.12.0) with the matching version
  4. Allow the Release workflow to run and create a draft release
  5. Review and publish the release

rabbitmq-lvc-exchange's People

Contributors

acogoluegnes avatar ansd avatar dcorbacho avatar dumbbell avatar gerhard avatar gomoripeti avatar gotthardp avatar hairyhum avatar hyperthunk avatar kjnilsson avatar lukebakken avatar michaelklishin avatar mmorrisontx avatar pjk25 avatar santif avatar spring-operator avatar squaremo avatar tonyg avatar videlalvaro avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rabbitmq-lvc-exchange's Issues

Reset of cache

Since broker version 3.7.15 you introduced the feature of a persistance cache that still survives the restart of the broker.
For components not being the data master of the data distributed this leads to data inconsitencies in our product since we distribute data that is not valid anymore and until the new valid data is available from the data master.

Therefore I would like to reset the cache during restart to avoid that inconsistency. Is this functionality already available since I could not find anything about that till now?

x-lvc does not handle message expiration

I create message with expiration=100 and push it to the x-lvc. When a new queue is bound, the x-lvc sends the last published value with expiration=100, no matter how long the message was stored in the database. This means I could get an expired message.
I think the x-lvc shall keep a timestamp and delete expired messages. I'm going to propose a patch for this eventually.

Where is the Plugin for RabbitMQ 3.11?

The readme says that the latest release targets RabbitMQ version 3.11, but the actual latest release targets 3.10 and can not be enabled with version 3.11.

Unable to read lvc message with wildcard routing Keys

Thank you for writing this wonderful plugin. I'm able to successfully read last message from x-lvc type exchange. However currently plugin does not seem to support wildcard routing key.

For example my routing key is: marketdata.equity.IBM and I'm able to read last message with routing key as marketdata.equity.IBM. However it will be great to be able to fetch all messages with marketdata.equity.* routing key.

Payload return a memory position instead of the message

Im working with Spring Integration + RabbitMQ, not sure if this will be the problem, and when i get the value of the queue it returns to me a memory position as "[B@1f606a57" instead of the message i send

To check if the problem was sending values, y try both sending a message from my application and from the manager but the behaviour was the same.

Any idea?

[In]compatibility with recent RabbitMQ versions

I'm trying to use the x-lvc exchange type but binding a queue to the exchange throws an error.
I downloaded Version 3.6.9 from bintray and use it on rabbitmq server version 3.6 with management in a docker container.
As client lib i'm using C# RabbitMQ.Client Version 4.1.3.
Creating the exchange and publishing a message is working, but binding a queue to the exchange fails.

Publish code:

var factory = new ConnectionFactory() { HostName = "*******" };
var connection = factory.CreateConnection();
 _channel = connection.CreateModel();
_channel.BasicQos(0, 1, false);
_channel.ExchangeDeclare("lvcq", "x-lvc");
_channel.BasicPublish("lvcq", "test", null, BitConverter.GetBytes(42));

Receiver code:

string queueName = _channel.QueueDeclare("my_queue");
_channel.QueueBind(queue: "my_queue", exchange: "lvcq", routingKey: "test");

"_channel.QueueBind" fails with:

The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=541, text="INTERNAL_ERROR", classId=0, methodId=0, cause=

Log from server:
=ERROR REPORT==== 8-Sep-2017::07:51:59 ===
** Generic server <0.11633.0> terminating
** Last message in was {'$gen_cast',
                           {method,
                               {'queue.bind',0,<<"my_queue">>,<<"lvcq">>,
                                   <<"test">>,false,[]},
                               none,noflow}}
** When Server state == {ch,running,rabbit_framing_amqp_0_9_1,1,<0.11614.0>,
                            <0.11631.0>,<0.11614.0>,
                            <<"172.17.0.1:45828 -> 172.17.0.2:5672">>,
                            {lstate,<0.11632.0>,false},
                            none,1,
                            {[],[]},
                            {user,<<"guest">>,
                                [administrator],
                                [{rabbit_auth_backend_internal,none}]},
                            <<"/">>,<<"my_queue">>,
                            {dict,0,16,16,8,80,48,
                                {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                                 []},
                                {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                                  [],[]}}},
                            {state,
                                {dict,0,16,16,8,80,48,
                                    {[],[],[],[],[],[],[],[],[],[],[],[],[],
                                     [],[],[]},
                                    {{[],[],[],[],[],[],[],[],[],[],[],[],[],
                                      [],[],[]}}},
                                erlang},
                            {dict,0,16,16,8,80,48,
                                {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                                 []},
                                {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                                  [],[]}}},
                            {dict,0,16,16,8,80,48,
                                {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                                 []},
                                {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                                  [],[]}}},
                            {set,0,16,16,8,80,48,
                                {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                                 []},
                                {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                                  [],[]}}},
                            <0.11615.0>,
                            {state,fine,5000,#Ref<0.0.1.28395>},
                            false,1,
                            {{0,nil},{0,nil}},
                            [],
                            {{0,nil},{0,nil}},
                            [{<<"publisher_confirms">>,bool,true},
                             {<<"exchange_exchange_bindings">>,bool,true},
                             {<<"basic.nack">>,bool,true},
                             {<<"consumer_cancel_notify">>,bool,true},
                             {<<"connection.blocked">>,bool,true},
                             {<<"authentication_failure_close">>,bool,true}],
                            none,1,none,flow,[]}
** Reason for termination ==
** {{case_clause,{ok,{amqqueue,{resource,<<"/">>,queue,<<"my_queue">>},
                               false,true,<0.11614.0>,[],<0.11652.0>,[],[],[],
                               undefined,[],[],live}}},
    [{rabbit_exchange_type_lvc,add_binding,3,
                               [{file,"src/rabbit_exchange_type_lvc.erl"},
                                {line,57}]},
     {rabbit_binding,x_callback,4,
                     [{file,"src/rabbit_binding.erl"},{line,568}]},
     {rabbit_binding,'-add/3-fun-0-',3,
                     [{file,"src/rabbit_binding.erl"},{line,195}]},
     {rabbit_channel,binding_action,9,
                     [{file,"src/rabbit_channel.erl"},{line,1619}]},
     {rabbit_channel,handle_cast,2,
                     [{file,"src/rabbit_channel.erl"},{line,457}]},
     {gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1032}]},
     {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]}

=CRASH REPORT==== 8-Sep-2017::07:51:59 ===
  crasher:
    initial call: rabbit_channel:init/1
    pid: <0.11633.0>
    registered_name: []
    exception exit: {{case_clause,
                         {ok,{amqqueue,
                                 {resource,<<"/">>,queue,<<"my_queue">>},
                                 false,true,<0.11614.0>,[],<0.11652.0>,[],[],
                                 [],undefined,[],[],live}}},
                     [{rabbit_exchange_type_lvc,add_binding,3,
                          [{file,"src/rabbit_exchange_type_lvc.erl"},
                           {line,57}]},
                      {rabbit_binding,x_callback,4,
                          [{file,"src/rabbit_binding.erl"},{line,568}]},
                      {rabbit_binding,'-add/3-fun-0-',3,
                          [{file,"src/rabbit_binding.erl"},{line,195}]},
                      {rabbit_channel,binding_action,9,
                          [{file,"src/rabbit_channel.erl"},{line,1619}]},
                      {rabbit_channel,handle_cast,2,
                          [{file,"src/rabbit_channel.erl"},{line,457}]},
                      {gen_server2,handle_msg,2,
                          [{file,"src/gen_server2.erl"},{line,1032}]},
                      {proc_lib,init_p_do_apply,3,
                          [{file,"proc_lib.erl"},{line,247}]}]}
      in function  gen_server2:terminate/3 (src/gen_server2.erl, line 1143)
    ancestors: [<0.11630.0>,<0.11618.0>,<0.11613.0>,<0.11612.0>,<0.375.0>,
                  <0.374.0>,<0.373.0>,rabbit_sup,<0.140.0>]
    messages: []
    links: [<0.11630.0>]
    dictionary: [{{xtype_to_module,'x-lvc'},rabbit_exchange_type_lvc},
                  {{credit_to,<0.11614.0>},49},
                  {{exchange_stats,{resource,<<"/">>,exchange,<<"lvcq">>}},
                   [{publish,1}]},
                  {process_name,
                      {rabbit_channel,
                          {<<"172.17.0.1:45828 -> 172.17.0.2:5672">>,1}}},
                  {permission_cache,
                      [{{resource,<<"/">>,exchange,<<"lvcq">>},read},
                       {{resource,<<"/">>,queue,<<"my_queue">>},write},
                       {{resource,<<"/">>,queue,<<"my_queue">>},configure},
                       {{resource,<<"/">>,exchange,<<"lvcq">>},write},
                       {{resource,<<"/">>,exchange,<<"lvcq">>},configure}]},
                  {guid,{{3170912396,4248614677,569459747,3844144684},0}},
                  {credit_flow_default_credit,{200,50}},
                  {rand_seed,
                      {#{max => 288230376151711743,
                         next => #Fun<rand.8.41921595>,
                         type => exsplus,
                         uniform => #Fun<rand.9.41921595>,
                         uniform_n => #Fun<rand.10.41921595>},
                       [12090994211920930|59977493088113200]}},
                  {channel_operation_timeout,15000},
                  {msg_size_for_gc,4}]
    trap_exit: true
    status: running
    heap_size: 2586
    stack_size: 27
    reductions: 3413
  neighbours:

=SUPERVISOR REPORT==== 8-Sep-2017::07:51:59 ===
     Supervisor: {<0.11630.0>,rabbit_channel_sup}
     Context:    child_terminated
     Reason:     {{case_clause,
                      {ok,{amqqueue,
                              {resource,<<"/">>,queue,<<"my_queue">>},
                              false,true,<0.11614.0>,[],<0.11652.0>,[],[],[],
                              undefined,[],[],live}}},
                  [{rabbit_exchange_type_lvc,add_binding,3,
                       [{file,"src/rabbit_exchange_type_lvc.erl"},{line,57}]},
                   {rabbit_binding,x_callback,4,
                       [{file,"src/rabbit_binding.erl"},{line,568}]},
                   {rabbit_binding,'-add/3-fun-0-',3,
                       [{file,"src/rabbit_binding.erl"},{line,195}]},
                   {rabbit_channel,binding_action,9,
                       [{file,"src/rabbit_channel.erl"},{line,1619}]},
                   {rabbit_channel,handle_cast,2,
                       [{file,"src/rabbit_channel.erl"},{line,457}]},
                   {gen_server2,handle_msg,2,
                       [{file,"src/gen_server2.erl"},{line,1032}]},
                   {proc_lib,init_p_do_apply,3,
                       [{file,"proc_lib.erl"},{line,247}]}]}
     Offender:   [{pid,<0.11633.0>},
                  {name,channel},
                  {mfargs,
                      {rabbit_channel,start_link,
                          [1,<0.11614.0>,<0.11631.0>,<0.11614.0>,
                           <<"172.17.0.1:45828 -> 172.17.0.2:5672">>,
                           rabbit_framing_amqp_0_9_1,
                           {user,<<"guest">>,
                               [administrator],
                               [{rabbit_auth_backend_internal,none}]},
                           <<"/">>,
                           [{<<"publisher_confirms">>,bool,true},
                            {<<"exchange_exchange_bindings">>,bool,true},
                            {<<"basic.nack">>,bool,true},
                            {<<"consumer_cancel_notify">>,bool,true},
                            {<<"connection.blocked">>,bool,true},
                            {<<"authentication_failure_close">>,bool,true}],
                           <0.11615.0>,<0.11632.0>]}},
                  {restart_type,intrinsic},
                  {shutdown,70000},
                  {child_type,worker}]


=SUPERVISOR REPORT==== 8-Sep-2017::07:51:59 ===
     Supervisor: {<0.11630.0>,rabbit_channel_sup}
     Context:    shutdown
     Reason:     reached_max_restart_intensity
     Offender:   [{pid,<0.11633.0>},
                  {name,channel},
                  {mfargs,
                      {rabbit_channel,start_link,
                          [1,<0.11614.0>,<0.11631.0>,<0.11614.0>,
                           <<"172.17.0.1:45828 -> 172.17.0.2:5672">>,
                           rabbit_framing_amqp_0_9_1,
                           {user,<<"guest">>,
                               [administrator],
                               [{rabbit_auth_backend_internal,none}]},
                           <<"/">>,
                           [{<<"publisher_confirms">>,bool,true},
                            {<<"exchange_exchange_bindings">>,bool,true},
                            {<<"basic.nack">>,bool,true},
                            {<<"consumer_cancel_notify">>,bool,true},
                            {<<"connection.blocked">>,bool,true},
                            {<<"authentication_failure_close">>,bool,true}],
                           <0.11615.0>,<0.11632.0>]}},
                  {restart_type,intrinsic},
                  {shutdown,70000},
                  {child_type,worker}]


=ERROR REPORT==== 8-Sep-2017::07:51:59 ===
Error on AMQP connection <0.11614.0> (172.17.0.1:45828 -> 172.17.0.2:5672, vhost: '/', user: 'guest', state: running), channel 1:
{{case_clause,{ok,{amqqueue,{resource,<<"/">>,queue,<<"my_queue">>},
                            false,true,<0.11614.0>,[],<0.11652.0>,[],[],[],
                            undefined,[],[],live}}},
 [{rabbit_exchange_type_lvc,add_binding,3,
                            [{file,"src/rabbit_exchange_type_lvc.erl"},
                             {line,57}]},
  {rabbit_binding,x_callback,4,[{file,"src/rabbit_binding.erl"},{line,568}]},
  {rabbit_binding,'-add/3-fun-0-',3,
                  [{file,"src/rabbit_binding.erl"},{line,195}]},
  {rabbit_channel,binding_action,9,
                  [{file,"src/rabbit_channel.erl"},{line,1619}]},
  {rabbit_channel,handle_cast,2,[{file,"src/rabbit_channel.erl"},{line,457}]},
  {gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1032}]},
  {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]}

=WARNING REPORT==== 8-Sep-2017::07:51:59 ===
Non-AMQP exit reason '{{case_clause,
                           {ok,{amqqueue,
                                   {resource,<<"/">>,queue,<<"my_queue">>},
                                   false,true,<0.11614.0>,[],<0.11652.0>,[],
                                   [],[],undefined,[],[],live}}},
                       [{rabbit_exchange_type_lvc,add_binding,3,
                            [{file,"src/rabbit_exchange_type_lvc.erl"},
                             {line,57}]},
                        {rabbit_binding,x_callback,4,
                            [{file,"src/rabbit_binding.erl"},{line,568}]},
                        {rabbit_binding,'-add/3-fun-0-',3,
                            [{file,"src/rabbit_binding.erl"},{line,195}]},
                        {rabbit_channel,binding_action,9,
                            [{file,"src/rabbit_channel.erl"},{line,1619}]},
                        {rabbit_channel,handle_cast,2,
                            [{file,"src/rabbit_channel.erl"},{line,457}]},
                        {gen_server2,handle_msg,2,
                            [{file,"src/gen_server2.erl"},{line,1032}]},
                        {proc_lib,init_p_do_apply,3,
                            [{file,"proc_lib.erl"},{line,247}]}]}'

rabbitmq-lvc-plugin is not working with rabbitmq 3.6.6 version

Hi,

I am using rabbitmq 3.6.6 and later versions and trying to get last cache value using lvc plugin (rabbitmq_lvc-0.0.4.ez).

when my consumer tries to declare a new queue and consume lvc value i am getting the following error. The same consumer code is working with 3.6.5.

[root@EMSDT Rmq_Traffic]# python Consumer_lvc.py
Traceback (most recent call last):
File "Consumer_lvc.py", line 27, in
routing_key=bind_key
File "build/bdist.linux-x86_64/egg/pika/adapters/blocking_connection.py", line 2402, in queue_bind
File "build/bdist.linux-x86_64/egg/pika/adapters/blocking_connection.py", line 1174, in _flush_output
File "build/bdist.linux-x86_64/egg/pika/adapters/blocking_connection.py", line 420, in _flush_output
pika.exceptions.ConnectionClosed: (541, 'INTERNAL_ERROR')

Plugin does not start with {invalid_ez,einval} error

I am trying to use lvc plugin latest version 3.8.0 on a rabbitmq in docker.

FROM rabbitmq:3-management
COPY ./rabbitmq_lvc_exchange-3.8.0.ez /plugins/rabbitmq_lvc_exchange-3.8.0.ez

The docker starts successfuly, but logs the below error for the lvc plugin:

Problem reading some plugins: [{"/opt/rabbitmq/plugins/rabbitmq_lvc_exchange-3.8.0.ez",{invalid_ez,einval}}]

Branch v3.8.x no longer builds - can't find include lib "rabbit/include/amqqueue.hrl"

Not really sure what has changed (it must be a change of a dependency) however it is no longer possible to build v3.8.x of this repo.

The problem is very easy to reproduce - just checkout a clean copy of the branch and make dist leads to the following error message:

src/rabbit_federation_util.erl:10: can't find include lib "rabbit/include/amqqueue.hrl"

I'm pretty sure everything was working few days ago.

Does x-lvc supports federation policies.

Hi,

My scenario is, I have two clusters C1 and C2 with 3 rabbitmq nodes on each cluster. 'RMQLvc1' exchange of type x-lvc is created in both clusters. I wanted to consume messages publised to C1 from C2 and vise versa.
To achieve this, i have applied exchange federation policy to RMQLvc1 but not working.
Please confirm, does x-lvc supports federation policies.

Issue with LVC exchange - Mnesia is not replicating the contents for a LVC table

Please file an issue on the LVC repo and we'll try to port the fixes to this exchange

On Thu, Jul 16, 2015 at 11:08 AM Ojha, Ashish [email protected] wrote:
Some of the tests I did when we had this issue .

  1.   Declaring a new LVC exchange is not an issue .
    
  2.   Deleting a LVC exchange is a problem  , it gives below error when I try deleting a LVC exchange .
    

Got response code 500 with body {"error":"Internal Server Error","reason":"{exit,\n {{{error,{no_exists,lvc}},\n [{rabbit_misc,execute_mnesia_transaction,1,\n [{file,"src/rabbit_misc.erl"},{line,540}]},\n {rabbit_misc,execute_mnesia_tx_with_tail,1,\n [{file,"src/rabbit_misc.erl"},{line,562}]},\n {rabbit_channel,handle_method,3,\n [{file,"src/rabbit_channel.erl"},{line,1093}]},\n {rabbit_channel,handle_cast,2,\n [{file,"src/rabbit_channel.erl"},{line,326}]},\n {gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1034}]},\n {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,239}]}]},\n {gen_server,call,\n [<0.23460.100>,\n {call,{'exchange.delete',0,<<"ddd">>,false,false},none,<0.7589.100>},\n infinity]}},\n [{gen_server,call,3,[{file,"gen_server.erl"},{line,188}]},\n {rabbit_mgmt_util,'-amqp_request/5-fun-0-',4,\n [{file,"rabbitmq-management/src/rabbit_mgmt_util.erl"},{line,416}]},\n {rabbit_mgmt_util,with_channel,5,\n [{file,"rabbitmq-management/src/rabbit_mgmt_util.erl"},{line,435}]},\n {webmachine_resource,resource_call,3,\n [{file,\n "webmachine-wrapper/webmachine-git/src/webmachine_resource.erl"},\n {line,186}]},\n {webmachine_resource,do,3,\n [{file,\n "webmachine-wrapper/webmachine-git/src/webmachine_resource.erl"},\n {line,142}]},\n {webmachine_decision_core,resource_call,1,\n [{file,\n "webmachine-wrapper/webmachine-git/src/webmachine_decision_core.erl"},\n {line,48}]},\n {webmachine_decision_core,decision,1,\n [{file,\n "webmachine-wrapper/webmachine-git/src/webmachine_decision_core.erl"},\n {line,440}]},\n {webmachine_decision_core,handle_request,2,\n [{file,\n "webmachine-wrapper/webmachine-git/src/webmachine_decision_core.erl"},\n {line,33}]}]}\n"}

  1.   During this issue , publisher code always fails while consumers still can consumer from the queue which is bind to the LVC exchange .
    
  2.   In 3 node cluster if we shut down one node , then publisher fails to publish to LVC exchange . Consumers still works fine .
    

Is it possible to fix the LVC exchange for the bug you have mentioned ?
or is LVC exchange no more supported and users should move off LVC and use Recent History Exchange ?

We have LVC exchange deployed in production so wondering if the LVC can be fixed . Moving to Recent History Exchange will need time/effort/testing ..

From: Alvaro Videla [mailto:[email protected]]
Sent: Thursday, July 16, 2015 1:56 PM

To: Ojha, Ashish [Tech]
Cc: Michael Klishin; [email protected]
Subject: Re: [rabbitmq-users] Issue while publishing to LVC exchange

Ah OK, sorry, I thought it was a different error this time.

So if while the cluster was let's call it "in error state", if you declared a new LVC exchange, with a different name from the one you were using, you still get those errors?

It looks like Mnesia is not replicating the contents for that particular LVC table.

Looking at the code it seems LVC doesn't add table copies:

https://github.com/rabbitmq/rabbitmq-lvc-plugin/blob/master/src/rabbit_lvc_plugin.erl#L18

Which is a bug that we fixed for the recent history exchange plugin: https://github.com/videlalvaro/rabbitmq-recent-history-exchange/blob/master/src/rabbit_exchange_type_recent_history.erl#L119

This was the related issue rabbitmq/rabbitmq-recent-history-exchange#9

So I think using the LVC plugin you could run on the same issue as well.

If you use the recent-history-exchange with a "x-recent-history-length" value of 1, then you should get the same behaviour as with the LVC plugin.

On Thu, Jul 16, 2015 at 10:15 AM, Ojha, Ashish [email protected] wrote:
sure , I pasted the error in my first email .
adding the error again .

client logs ;

[20150710 10:32:39.842-S-OmaServerMessage-Thread-3] Unexpected exception in OmaServerMessage.process(): com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0)

com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:296)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:648)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:631)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:622)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:160)
Broker logs :

=ERROR REPORT==== 10-Jul-2015::09:43:46 ===
** Generic server <0.9385.62> terminating
** Last message in was {'$gen_cast',
{method,
{'queue.bind',0,
<<"NONPROD.JETSTREAM.QA.JetStreamOma.jtdb">>,
<<"NONPROD.JETSTREAM.QA.JSQA.Lvc">>,
<<"JetStreamOma.jtdb">>,false,[]},
none,noflow}}
** When Server state == {ch,running,rabbit_framing_amqp_0_9_1,1,<0.9381.62>,
<0.9387.62>,<0.9381.62>,
<<"10.152.132.150:55490 -> 10.152.155.201:25009">>,
{lstate,<0.9386.62>,false},

                        <<"10.152.132.150:55490 -> 10.152.155.201:25009">>,
                        {lstate,<0.9386.62>,false},
                        none,1,
                        {[],[]},
                        {user,<<"nonprod.jetstream-oma">>,[],
                            [{rabbit_auth_backend_internal,none}]},
                        <<"/">>,
                        <<"NONPROD.JETSTREAM.QA.JetStreamOma.jtdb">>,
                        {dict,0,16,16,8,80,48,
                            {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                             []},
                            {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                              [],[]}}},
                        {state,
                            {dict,0,16,16,8,80,48,
                                {[],[],[],[],[],[],[],[],[],[],[],[],[],
                                 [],[],[]},
                                {{[],[],[],[],[],[],[],[],[],[],[],[],[],
                                  [],[],[]}}},
                            erlang},
                        {dict,0,16,16,8,80,48,
                            {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                             []},
                            {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                              [],[]}}},
                        {dict,0,16,16,8,80,48,
                            {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                             []},
                            {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                              [],[]}}},
                        {set,0,16,16,8,80,48,
                            {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                             []},
                            {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                              [],[]}}},
                        <0.9380.62>,
                        {state,fine,5000,#Ref<0.0.66.224148>},
                        false,1,
                        {{0,nil},{0,nil}},
                        [],
                        {{0,nil},{0,nil}},
                        [{<<"exchange_exchange_bindings">>,bool,true},
                         {<<"authentication_failure_close">>,bool,true},

                        [{<<"exchange_exchange_bindings">>,bool,true},
                         {<<"authentication_failure_close">>,bool,true},
                         {<<"consumer_cancel_notify">>,bool,true},
                         {<<"basic.nack">>,bool,true},
                         {<<"publisher_confirms">>,bool,true},
                         {<<"connection.blocked">>,bool,true}],
                        none,5,none}

** Reason for termination ==
** {{aborted,
{no_exists,
[lvc,
{cachekey,
{resource,<<"/">>,exchange,
<<"NONPROD.JETSTREAM.QA.JSQA.Lvc">>},
<<"JetStreamOma.jtdb">>}]}},
[{mnesia,abort,1,[{file,"mnesia.erl"},{line,309}]},
{rabbit_exchange_type_lvc,add_binding,3,
[{file,"src/rabbit_exchange_type_lvc.erl"},{line,66}]},
{rabbit_binding,x_callback,4,
[{file,"src/rabbit_binding.erl"},{line,561}]},
{rabbit_binding,'-add/3-fun-0-',3,
[{file,"src/rabbit_binding.erl"},{line,193}]},
{rabbit_channel,binding_action,9,
[{file,"src/rabbit_channel.erl"},{line,1471}]},
{rabbit_channel,handle_cast,2,
[{file,"src/rabbit_channel.erl"},{line,326}]},
{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1034}]},
{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,239}]}]}

=ERROR REPORT==== 10-Jul-2015::09:43:46 ===
Error on AMQP connection <0.9381.62> (10.152.132.150:55490 -> 10.152.155.201:25009, vhost: '/', user: 'nonprod.jetstream-oma', state: running), channel 1:
{{aborted,
{no_exists,
[lvc,
{cachekey,
{resource,<<"/">>,exchange,<<"NONPROD.JETSTREAM.QA.JSQA.Lvc">>},
<<"JetStreamOma.jtdb">>}]}},
[{mnesia,abort,1,[{file,"mnesia.erl"},{line,309}]},
{rabbit_exchange_type_lvc,add_binding,3,
[{file,"src/rabbit_exchange_type_lvc.erl"},{line,66}]},
{rabbit_binding,x_callback,4,[{file,"src/rabbit_binding.erl"},{line,561}]},
{rabbit_binding,'-add/3-fun-0-',3,
[{file,"src/rabbit_binding.erl"},{line,193}]},

{rabbit_binding,'-add/3-fun-0-',3,
[{file,"src/rabbit_binding.erl"},{line,193}]},
{rabbit_channel,binding_action,9,
[{file,"src/rabbit_channel.erl"},{line,1471}]},
{rabbit_channel,handle_cast,2,[{file,"src/rabbit_channel.erl"},{line,326}]},
{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1034}]},
{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,239}]}]}

=WARNING REPORT==== 10-Jul-2015::09:43:46 ===
Non-AMQP exit reason '{{aborted,
{no_exists,
[lvc,
{cachekey,
{resource,<<"/">>,exchange,
<<"NONPROD.JETSTREAM.QA.JSQA.Lvc">>},
<<"JetStreamOma.jtdb">>}]}},
[{mnesia,abort,1,[{file,"mnesia.erl"},{line,309}]},
{rabbit_exchange_type_lvc,add_binding,3,
[{file,"src/rabbit_exchange_type_lvc.erl"},
{line,66}]},
{rabbit_binding,x_callback,4,
[{file,"src/rabbit_binding.erl"},{line,561}]},
{rabbit_binding,'-add/3-fun-0-',3,
[{file,"src/rabbit_binding.erl"},{line,193}]},
{rabbit_channel,binding_action,9,
[{file,"src/rabbit_channel.erl"},{line,1471}]},
{rabbit_channel,handle_cast,2,
[{file,"src/rabbit_channel.erl"},{line,326}]},
{gen_server2,handle_msg,2,
[{file,"src/gen_server2.erl"},{line,1034}]},
{proc_lib,init_p_do_apply,3,
[{file,"proc_lib.erl"},{line,239}]}]}'

From: Alvaro Videla [mailto:[email protected]]
Sent: Thursday, July 16, 2015 1:44 PM
To: Ojha, Ashish [Tech]
Cc: Michael Klishin; [email protected]

Subject: Re: [rabbitmq-users] Issue while publishing to LVC exchange

It would be nice to get what errors were produced in the failing scenario to see if it's something we can fix on our side.

On Thu, Jul 16, 2015 at 10:09 AM, Ojha, Ashish [email protected] wrote:
Thanks Alvaro , really appreciate your help on this one .

so my assumption and resolution looks fine to you if we hit this issue again ?

From: Alvaro Videla [mailto:[email protected]]
Sent: Thursday, July 16, 2015 1:36 PM
To: Ojha, Ashish [Tech]; Michael Klishin
Cc: [email protected]
Subject: Re: [rabbitmq-users] Issue while publishing to LVC exchange

The LVC exchange (and the recent history exchange as well) keeps a small table in Mnesia where they store the last N messages.

On Thu, Jul 16, 2015 at 9:05 AM Ojha, Ashish [email protected] wrote:
Hi Michael ,

We had this issue with LVC exchange and we figured out a way to fix it . Is it possible if we can get some understanding of the issue please or the behavior of the LVC exchange

Scenario :

3 Node cluster ( A,B,C ) with Mirroring enabled for all the queues .
On Sunday , Node B and Node C were rebuilt from scratch ( mnesia was recreated ) meaning they went down and joined the cluster as virgin nodes .
Node A was untouched .
After Node B and Node C were back up , it clustered with Node A and everything worked fine .
Later we realized that LVC exchange is not working fine .
Any connection made to LVC exchange was closed down by the broker ( when the publisher invoked basicPublish() method )
All other exchanges were working fine ( topic , direct exchange )

Question :

  • Does LVC exchange maintains any kind of state between the nodes in the cluster ?
  • In our scenario , only Node A was 'unchanged' , so is it possible that only Node A knew about LVC exchange state and when Node B and Node C joined the cluster there was a conflict between the LVC exchange state in the mnesia LVC table ?

How we resolved the issue

  • disable LVC plugin .
  • shutdown all the nodes and start them in order ( last disc to go down was the first to come back up )
  • enable the LVC plugin .

we tested LVC exchange after this resolution and everything was working fine .

-----Original Message-----
From: Michael Klishin [mailto:[email protected]]
Sent: Friday, July 10, 2015 5:33 PM
To: Ojha, Ashish [Tech]
Cc: [email protected]
Subject: RE: [rabbitmq-users] Issue while publishing to LVC exchange

On 10 July 2015 at 15:02:08, Ojha, Ashish ([email protected]) wrote:

plugin version is also 3.5.0 and we get it from github
https://github.com/rabbitmq/rabbitmq-lvc-plugin
and build it from source

Please use RabbitMQ 3.5.3 and the pre-compiled plugin version from our community plugins page.

MK

Staff Software Engineer, Pivotal/RabbitMQ

You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to [email protected].
To post to this group, send email to [email protected].
For more options, visit https://groups.google.com/d/optout.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.