Git Product home page Git Product logo

Comments (9)

grs avatar grs commented on September 28, 2024

Can you attach a simple reproducer? Or else a protocol trace/log of the interaction? Acknowledgement should be allowed after the detaching of the receiver. Whether the broker handles that is another question though. It may consider unacked messages at the point the receiver is closed as unconsumed.

from rhea.

kbfirebreather avatar kbfirebreather commented on September 28, 2024

When you say protocol trace/log, do you mean from rhea, or my broker? (ActiveMQ in this case)

from rhea.

grs avatar grs commented on September 28, 2024

Either should do. The key thing is to determine what the actual interaction is.

from rhea.

kbfirebreather avatar kbfirebreather commented on September 28, 2024

Okay, turned on rhea debug logging, and scaled my service to two so there were two consumers on the queue.

here are the logs from the service that consumed the message originally, showing the message being consumed, a sleep statement being executed, and then the SIGTERM being sent in the middle of the sleep.

[vagrant@k3sdev (default) /opt/service/kubernetes ]> kubectl logs -f service-alerts-dbd9d8cc5-sgkx8 --since=10s
Mon, 16 Oct 2023 13:31:20 GMT rhea:io [connection-1] read 440 bytes
Mon, 16 Oct 2023 13:31:20 GMT rhea:io [connection-1] got frame of size 440
Mon, 16 Oct 2023 13:31:20 GMT rhea:raw [connection-1] RECV: 440 000001b802000000005314c00905520243a001004342005370c00402415000005372c12904a30e782d6f70742d6a6d732d646573745100a312782d6f70742d6a6d732d6d73672d747970655105005373d0000000280000000a40a00475736572a10c71756575652e616c65727473404040404040830000018b38af1647005374c11502a1114a4d535844656c6976657279436f756e7440005377b1000001197b2274797065223a22616c657274222c227365766572697479223a227761726e696e67222c2274696d657374616d70223a22323031312d31302d30355431343a34383a30302e3030305a222c227469746c65223a225465737420416c657274222c22636f6e74656e74223a2254686520636f6e74656e742e2e2e222c226461746154797065223a22747970654f6644617461222c2264617461223a7b227761726e696e67223a22736f6d657468696e67222c22696e666f223a22736f6d657468696e6720656c7365227d2c2265787465726e616c223a66616c73652c2275726c223a2268747470733a2f2f6d61737465722e62656173742e75732e6c6d636f2e636f6d2f222c2270726f6475636572223a226265617374227d
Mon, 16 Oct 2023 13:31:20 GMT rhea:frames [connection-1]:0 <- transfer#14 {"handle":2,"delivery_tag":{"type":"Buffer","data":[0]}} <Buffer 00 53 70 c0 04 02 41 50 00 00 53 72 c1 29 04 a3 0e 78 2d 6f 70 74 2d 6a 6d 73 2d 64 65 73 74 51 00 a3 12 78 2d 6f 70 74 2d 6a 6d 73 2d 6d 73 67 2d 74 ... 368 more bytes>
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'List8', typecode: 192, width: 1, category: 3, create: [Function (anonymous)] { typecode: 192 } }, value: [ Typed { type: [TypeDesc], value: true }, Typed { type: [TypeDesc], value: 0 } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 112 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 112 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Map8', typecode: 193, width: 1, category: 3, create: [Function (anonymous)] { typecode: 193 } }, value: [ Typed { type: [TypeDesc], value: 'x-opt-jms-dest' }, Typed { type: [TypeDesc], value: 0 }, Typed { type: [TypeDesc], value: 'x-opt-jms-msg-type' }, Typed { type: [TypeDesc], value: 5 } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 114 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 114 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'List32', typecode: 208, width: 4, category: 3, create: [Function (anonymous)] { typecode: 208 } }, value: [ Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: <Buffer 75 73 65 72> }, Typed { type: [TypeDesc], value: 'queue.alerts' }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: 2023-10-16T13:31:20.519Z } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 115 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 115 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Map8', typecode: 193, width: 1, category: 3, create: [Function (anonymous)] { typecode: 193 } }, value: [ Typed { type: [TypeDesc], value: 'JMSXDeliveryCount' }, Typed { type: [TypeDesc], value: null } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 116 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 116 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Str32', typecode: 177, width: 4, category: 2, encoding: 'utf8', create: [Function (anonymous)] { typecode: 177 } }, value: '{"type":"alert","severity":"warning","timestamp":"2011-10-05T14:48:00.000Z","title":"Test Alert","content":"The content...","dataType":"typeOfData","data":{"warning":"something","info":"something else"},"external":false,"producer":"service"}', descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 119 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 119 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:events [connection-1] Link got event: message
[2023-10-16T13:31:20Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): Sleeping for 30 seconds to simulator long running amq consumer
Mon, 16 Oct 2023 13:31:21 GMT rhea:frames [connection-1]:0 -> empty
Mon, 16 Oct 2023 13:31:21 GMT rhea:raw [connection-1] SENT: 8 0000000802000000
[2023-10-16T13:31:26Z] INFO  (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): Received SIGTERM event. Shutting down.
[2023-10-16T13:31:26Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): closing context
[2023-10-16T13:31:26Z] INFO  (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): Shutting down Messenger
[2023-10-16T13:31:26Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): queue.alerts Detaching queue to wait for consuming messages while shutting down
[2023-10-16T13:31:26Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): Waiting for consumers to complete, or timeout to be reached
Mon, 16 Oct 2023 13:31:26 GMT rhea:frames [connection-1]:0 -> detach#16 {"handle":2}
Mon, 16 Oct 2023 13:31:26 GMT rhea:raw [connection-1] SENT: 23 0000001702000000005316d00000000700000002520242
[2023-10-16T13:31:26Z] INFO  (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): server closed
Mon, 16 Oct 2023 13:31:26 GMT rhea:io [connection-1] read 16 bytes
Mon, 16 Oct 2023 13:31:26 GMT rhea:io [connection-1] got frame of size 16
Mon, 16 Oct 2023 13:31:26 GMT rhea:raw [connection-1] RECV: 16 0000001002000000005316c003015202
Mon, 16 Oct 2023 13:31:26 GMT rhea:frames [connection-1]:0 <- detach#16 {"handle":2}
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [connection-1] Link got event: receiver_close
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [connection-1] Session got event: receiver_close
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [connection-1] Connection got event: receiver_close
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [cd669231-d8cf-b04c-aa6e-0bd01fbc508f] Container got event: receiver_close

Here is the other service that is consuming on the queue. Ends up picking up the message as soon as the other service detaches on the queue, which occurs at 13:31:26

[vagrant@k3sdev (default) ~ ]> kubectl logs -f service-alerts-dbd9d8cc5-dhrjk --since=10s
Mon, 16 Oct 2023 13:31:15 GMT rhea:io [connection-1] read 8 bytes
Mon, 16 Oct 2023 13:31:15 GMT rhea:io [connection-1] got frame of size 8
Mon, 16 Oct 2023 13:31:15 GMT rhea:raw [connection-1] RECV: 8 0000000802000000
Mon, 16 Oct 2023 13:31:15 GMT rhea:frames [connection-1]:0 <- empty
Mon, 16 Oct 2023 13:31:26 GMT rhea:io [connection-1] read 441 bytes
Mon, 16 Oct 2023 13:31:26 GMT rhea:io [connection-1] got frame of size 441
Mon, 16 Oct 2023 13:31:26 GMT rhea:raw [connection-1] RECV: 441 000001b902000000005314c00a0552025201a001004342005370c00402415000005372c12904a30e782d6f70742d6a6d732d646573745100a312782d6f70742d6a6d732d6d73672d747970655105005373d0000000280000000a40a00475736572a10c71756575652e616c65727473404040404040830000018b38af1647005374c11502a1114a4d535844656c6976657279436f756e7440005377b1000001197b2274797065223a22616c657274222c227365766572697479223a227761726e696e67222c2274696d657374616d70223a22323031312d31302d30355431343a34383a30302e3030305a222c227469746c65223a225465737420416c657274222c22636f6e74656e74223a2254686520636f6e74656e742e2e2e222c226461746154797065223a22747970654f6644617461222c2264617461223a7b227761726e696e67223a22736f6d657468696e67222c22696e666f223a22736f6d657468696e6720656c7365227d2c2265787465726e616c223a66616c73652c2275726c223a2268747470733a2f2f6d61737465722e62656173742e75732e6c6d636f2e636f6d2f222c2270726f6475636572223a226265617374227d
Mon, 16 Oct 2023 13:31:26 GMT rhea:frames [connection-1]:0 <- transfer#14 {"handle":2,"delivery_id":1,"delivery_tag":{"type":"Buffer","data":[0]}} <Buffer 00 53 70 c0 04 02 41 50 00 00 53 72 c1 29 04 a3 0e 78 2d 6f 70 74 2d 6a 6d 73 2d 64 65 73 74 51 00 a3 12 78 2d 6f 70 74 2d 6a 6d 73 2d 6d 73 67 2d 74 ... 368 more bytes>
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'List8', typecode: 192, width: 1, category: 3, create: [Function (anonymous)] { typecode: 192 } }, value: [ Typed { type: [TypeDesc], value: true }, Typed { type: [TypeDesc], value: 0 } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 112 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 112 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Map8', typecode: 193, width: 1, category: 3, create: [Function (anonymous)] { typecode: 193 } }, value: [ Typed { type: [TypeDesc], value: 'x-opt-jms-dest' }, Typed { type: [TypeDesc], value: 0 }, Typed { type: [TypeDesc], value: 'x-opt-jms-msg-type' }, Typed { type: [TypeDesc], value: 5 } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 114 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 114 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'List32', typecode: 208, width: 4, category: 3, create: [Function (anonymous)] { typecode: 208 } }, value: [ Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: <Buffer 75 73 65 72> }, Typed { type: [TypeDesc], value: 'queue.alerts' }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: 2023-10-16T13:31:20.519Z } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 115 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 115 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Map8', typecode: 193, width: 1, category: 3, create: [Function (anonymous)] { typecode: 193 } }, value: [ Typed { type: [TypeDesc], value: 'JMSXDeliveryCount' }, Typed { type: [TypeDesc], value: null } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 116 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 116 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Str32', typecode: 177, width: 4, category: 2, encoding: 'utf8', create: [Function (anonymous)] { typecode: 177 } }, value: '{"type":"alert","severity":"warning","timestamp":"2011-10-05T14:48:00.000Z","title":"Test Alert","content":"The content...","dataType":"typeOfData","data":{"warning":"something","info":"something else"},"external":false,"producer":"service"}', descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 119 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 119 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [connection-1] Link got event: message
[2023-10-16T13:31:26Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-dhrjk): Sleeping for 30 seconds to simulator long running amq consumer
Mon, 16 Oct 2023 13:31:27 GMT rhea:frames [connection-1]:0 -> empty
Mon, 16 Oct 2023 13:31:27 GMT rhea:raw [connection-1] SENT: 8 0000000802000000

from rhea.

grs avatar grs commented on September 28, 2024

Does anything else happen on the first process after the receiver_close? My understanding of your issue is that you want to acknowledge the message received after that point. Is that happening here? (There is no disposition in that first log, but neither is there a session close or connection close, so from an AMQP perspective it is incomplete).

My suspicion is that the broker will consider any messages unacknowledged at the point the receiver is detached to be available for redelivery. In that case, sending an acknowledgement after the detach is not going to have any effect anyway. (I can't confirm this without knowing what if anything happens with the first process after the detach).

Regarding the other question, at present you can only successfully drain the credit if you have automatic credit management disabled. There is an example of that here: https://github.com/amqp/rhea/blob/main/examples/drain.js.

from rhea.

kbfirebreather avatar kbfirebreather commented on September 28, 2024

Ah yeah, I accidentally cut off some logging at the end. I recreated it, and these were the rhea logs in the service that was shutting down

The message is acked, then the queue is closed

[2023-10-16T15:35:50Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-vzjn5): Closing queues
Mon, 16 Oct 2023 15:35:50 GMT rhea:frames [connection-1]:0 -> disposition#15 {"role":true,"settled":true,"state":[]}
Mon, 16 Oct 2023 15:35:50 GMT rhea:raw [connection-1] SENT: 28 0000001c02000000005315d00000000c000000054143434100532445

These are the logs from the second service that got the same message

Mon, 16 Oct 2023 15:35:59 GMT rhea:frames [connection-1]:0 -> disposition#15 {"role":true,"settled":true,"state":[]}
Mon, 16 Oct 2023 15:35:59 GMT rhea:raw [connection-1] SENT: 28 0000001c02000000005315d00000000c000000054143434100532445
Mon, 16 Oct 2023 15:35:59 GMT rhea:frames [connection-1]:0 -> flow#13 {"next_incoming_id":1,"incoming_window":2048,"outgoing_window":4294967295,"handle":2,"delivery_count":1,"link_credit":15}
Mon, 16 Oct 2023 15:35:59 GMT rhea:raw [connection-1] SENT: 39 0000002702000000005313d00000001700000007520170000008004370ffffffff52025201520f
Mon, 16 Oct 2023 15:36:15 GMT rhea:io [connection-1] read 8 bytes
Mon, 16 Oct 2023 15:36:15 GMT rhea:io [connection-1] got frame of size 8
Mon, 16 Oct 2023 15:36:15 GMT rhea:raw [connection-1] RECV: 8 0000000802000000
Mon, 16 Oct 2023 15:36:15 GMT rhea:frames [connection-1]:0 <- empty
Mon, 16 Oct 2023 15:36:21 GMT rhea:frames [connection-1]:0 -> empty
Mon, 16 Oct 2023 15:36:21 GMT rhea:raw [connection-1] SENT: 8 0000000802000000

It does look like this particular consumer is disabling automatic credit management, but not sure about the rest of the services. That is good to know!

Edit: Is it possible to mutate a consumer to disable automatic credit management, or is that only possible during consumer instantiation?

from rhea.

grs avatar grs commented on September 28, 2024

Ok, the extra logs confirm that the broker is redelivering any unacknowledged message at the point that the receiver is closed. So acknowledgements sent after that are ignored.

from rhea.

kbfirebreather avatar kbfirebreather commented on September 28, 2024

Is it possible to mutate a consumer to disable automatic credit management, or is that only possible during consumer instantiation?

from rhea.

grs avatar grs commented on September 28, 2024

At present you can only set that on creation of the receiver.

from rhea.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.