Comments (9)
Can you attach a simple reproducer? Or else a protocol trace/log of the interaction? Acknowledgement should be allowed after the detaching of the receiver. Whether the broker handles that is another question though. It may consider unacked messages at the point the receiver is closed as unconsumed.
from rhea.
When you say protocol trace/log, do you mean from rhea, or my broker? (ActiveMQ in this case)
from rhea.
Either should do. The key thing is to determine what the actual interaction is.
from rhea.
Okay, turned on rhea debug logging, and scaled my service to two so there were two consumers on the queue.
here are the logs from the service that consumed the message originally, showing the message being consumed, a sleep statement being executed, and then the SIGTERM
being sent in the middle of the sleep.
[vagrant@k3sdev (default) /opt/service/kubernetes ]> kubectl logs -f service-alerts-dbd9d8cc5-sgkx8 --since=10s
Mon, 16 Oct 2023 13:31:20 GMT rhea:io [connection-1] read 440 bytes
Mon, 16 Oct 2023 13:31:20 GMT rhea:io [connection-1] got frame of size 440
Mon, 16 Oct 2023 13:31:20 GMT rhea:raw [connection-1] RECV: 440 000001b802000000005314c00905520243a001004342005370c00402415000005372c12904a30e782d6f70742d6a6d732d646573745100a312782d6f70742d6a6d732d6d73672d747970655105005373d0000000280000000a40a00475736572a10c71756575652e616c65727473404040404040830000018b38af1647005374c11502a1114a4d535844656c6976657279436f756e7440005377b1000001197b2274797065223a22616c657274222c227365766572697479223a227761726e696e67222c2274696d657374616d70223a22323031312d31302d30355431343a34383a30302e3030305a222c227469746c65223a225465737420416c657274222c22636f6e74656e74223a2254686520636f6e74656e742e2e2e222c226461746154797065223a22747970654f6644617461222c2264617461223a7b227761726e696e67223a22736f6d657468696e67222c22696e666f223a22736f6d657468696e6720656c7365227d2c2265787465726e616c223a66616c73652c2275726c223a2268747470733a2f2f6d61737465722e62656173742e75732e6c6d636f2e636f6d2f222c2270726f6475636572223a226265617374227d
Mon, 16 Oct 2023 13:31:20 GMT rhea:frames [connection-1]:0 <- transfer#14 {"handle":2,"delivery_tag":{"type":"Buffer","data":[0]}} <Buffer 00 53 70 c0 04 02 41 50 00 00 53 72 c1 29 04 a3 0e 78 2d 6f 70 74 2d 6a 6d 73 2d 64 65 73 74 51 00 a3 12 78 2d 6f 70 74 2d 6a 6d 73 2d 6d 73 67 2d 74 ... 368 more bytes>
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'List8', typecode: 192, width: 1, category: 3, create: [Function (anonymous)] { typecode: 192 } }, value: [ Typed { type: [TypeDesc], value: true }, Typed { type: [TypeDesc], value: 0 } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 112 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 112 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Map8', typecode: 193, width: 1, category: 3, create: [Function (anonymous)] { typecode: 193 } }, value: [ Typed { type: [TypeDesc], value: 'x-opt-jms-dest' }, Typed { type: [TypeDesc], value: 0 }, Typed { type: [TypeDesc], value: 'x-opt-jms-msg-type' }, Typed { type: [TypeDesc], value: 5 } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 114 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 114 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'List32', typecode: 208, width: 4, category: 3, create: [Function (anonymous)] { typecode: 208 } }, value: [ Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: <Buffer 75 73 65 72> }, Typed { type: [TypeDesc], value: 'queue.alerts' }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: 2023-10-16T13:31:20.519Z } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 115 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 115 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Map8', typecode: 193, width: 1, category: 3, create: [Function (anonymous)] { typecode: 193 } }, value: [ Typed { type: [TypeDesc], value: 'JMSXDeliveryCount' }, Typed { type: [TypeDesc], value: null } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 116 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 116 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Str32', typecode: 177, width: 4, category: 2, encoding: 'utf8', create: [Function (anonymous)] { typecode: 177 } }, value: '{"type":"alert","severity":"warning","timestamp":"2011-10-05T14:48:00.000Z","title":"Test Alert","content":"The content...","dataType":"typeOfData","data":{"warning":"something","info":"something else"},"external":false,"producer":"service"}', descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 119 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 119 }
Mon, 16 Oct 2023 13:31:20 GMT rhea:events [connection-1] Link got event: message
[2023-10-16T13:31:20Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): Sleeping for 30 seconds to simulator long running amq consumer
Mon, 16 Oct 2023 13:31:21 GMT rhea:frames [connection-1]:0 -> empty
Mon, 16 Oct 2023 13:31:21 GMT rhea:raw [connection-1] SENT: 8 0000000802000000
[2023-10-16T13:31:26Z] INFO (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): Received SIGTERM event. Shutting down.
[2023-10-16T13:31:26Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): closing context
[2023-10-16T13:31:26Z] INFO (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): Shutting down Messenger
[2023-10-16T13:31:26Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): queue.alerts Detaching queue to wait for consuming messages while shutting down
[2023-10-16T13:31:26Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): Waiting for consumers to complete, or timeout to be reached
Mon, 16 Oct 2023 13:31:26 GMT rhea:frames [connection-1]:0 -> detach#16 {"handle":2}
Mon, 16 Oct 2023 13:31:26 GMT rhea:raw [connection-1] SENT: 23 0000001702000000005316d00000000700000002520242
[2023-10-16T13:31:26Z] INFO (alerts/1 on service-alerts-dbd9d8cc5-sgkx8): server closed
Mon, 16 Oct 2023 13:31:26 GMT rhea:io [connection-1] read 16 bytes
Mon, 16 Oct 2023 13:31:26 GMT rhea:io [connection-1] got frame of size 16
Mon, 16 Oct 2023 13:31:26 GMT rhea:raw [connection-1] RECV: 16 0000001002000000005316c003015202
Mon, 16 Oct 2023 13:31:26 GMT rhea:frames [connection-1]:0 <- detach#16 {"handle":2}
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [connection-1] Link got event: receiver_close
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [connection-1] Session got event: receiver_close
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [connection-1] Connection got event: receiver_close
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [cd669231-d8cf-b04c-aa6e-0bd01fbc508f] Container got event: receiver_close
Here is the other service that is consuming on the queue. Ends up picking up the message as soon as the other service detaches on the queue, which occurs at 13:31:26
[vagrant@k3sdev (default) ~ ]> kubectl logs -f service-alerts-dbd9d8cc5-dhrjk --since=10s
Mon, 16 Oct 2023 13:31:15 GMT rhea:io [connection-1] read 8 bytes
Mon, 16 Oct 2023 13:31:15 GMT rhea:io [connection-1] got frame of size 8
Mon, 16 Oct 2023 13:31:15 GMT rhea:raw [connection-1] RECV: 8 0000000802000000
Mon, 16 Oct 2023 13:31:15 GMT rhea:frames [connection-1]:0 <- empty
Mon, 16 Oct 2023 13:31:26 GMT rhea:io [connection-1] read 441 bytes
Mon, 16 Oct 2023 13:31:26 GMT rhea:io [connection-1] got frame of size 441
Mon, 16 Oct 2023 13:31:26 GMT rhea:raw [connection-1] RECV: 441 000001b902000000005314c00a0552025201a001004342005370c00402415000005372c12904a30e782d6f70742d6a6d732d646573745100a312782d6f70742d6a6d732d6d73672d747970655105005373d0000000280000000a40a00475736572a10c71756575652e616c65727473404040404040830000018b38af1647005374c11502a1114a4d535844656c6976657279436f756e7440005377b1000001197b2274797065223a22616c657274222c227365766572697479223a227761726e696e67222c2274696d657374616d70223a22323031312d31302d30355431343a34383a30302e3030305a222c227469746c65223a225465737420416c657274222c22636f6e74656e74223a2254686520636f6e74656e742e2e2e222c226461746154797065223a22747970654f6644617461222c2264617461223a7b227761726e696e67223a22736f6d657468696e67222c22696e666f223a22736f6d657468696e6720656c7365227d2c2265787465726e616c223a66616c73652c2275726c223a2268747470733a2f2f6d61737465722e62656173742e75732e6c6d636f2e636f6d2f222c2270726f6475636572223a226265617374227d
Mon, 16 Oct 2023 13:31:26 GMT rhea:frames [connection-1]:0 <- transfer#14 {"handle":2,"delivery_id":1,"delivery_tag":{"type":"Buffer","data":[0]}} <Buffer 00 53 70 c0 04 02 41 50 00 00 53 72 c1 29 04 a3 0e 78 2d 6f 70 74 2d 6a 6d 73 2d 64 65 73 74 51 00 a3 12 78 2d 6f 70 74 2d 6a 6d 73 2d 6d 73 67 2d 74 ... 368 more bytes>
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'List8', typecode: 192, width: 1, category: 3, create: [Function (anonymous)] { typecode: 192 } }, value: [ Typed { type: [TypeDesc], value: true }, Typed { type: [TypeDesc], value: 0 } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 112 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 112 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Map8', typecode: 193, width: 1, category: 3, create: [Function (anonymous)] { typecode: 193 } }, value: [ Typed { type: [TypeDesc], value: 'x-opt-jms-dest' }, Typed { type: [TypeDesc], value: 0 }, Typed { type: [TypeDesc], value: 'x-opt-jms-msg-type' }, Typed { type: [TypeDesc], value: 5 } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 114 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 114 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'List32', typecode: 208, width: 4, category: 3, create: [Function (anonymous)] { typecode: 208 } }, value: [ Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: <Buffer 75 73 65 72> }, Typed { type: [TypeDesc], value: 'queue.alerts' }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: 2023-10-16T13:31:20.519Z } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 115 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 115 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Map8', typecode: 193, width: 1, category: 3, create: [Function (anonymous)] { typecode: 193 } }, value: [ Typed { type: [TypeDesc], value: 'JMSXDeliveryCount' }, Typed { type: [TypeDesc], value: null } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 116 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 116 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:message decoding section: Typed { type: TypeDesc { name: 'Str32', typecode: 177, width: 4, category: 2, encoding: 'utf8', create: [Function (anonymous)] { typecode: 177 } }, value: '{"type":"alert","severity":"warning","timestamp":"2011-10-05T14:48:00.000Z","title":"Test Alert","content":"The content...","dataType":"typeOfData","data":{"warning":"something","info":"something else"},"external":false,"producer":"service"}', descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 119 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 119 }
Mon, 16 Oct 2023 13:31:26 GMT rhea:events [connection-1] Link got event: message
[2023-10-16T13:31:26Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-dhrjk): Sleeping for 30 seconds to simulator long running amq consumer
Mon, 16 Oct 2023 13:31:27 GMT rhea:frames [connection-1]:0 -> empty
Mon, 16 Oct 2023 13:31:27 GMT rhea:raw [connection-1] SENT: 8 0000000802000000
from rhea.
Does anything else happen on the first process after the receiver_close? My understanding of your issue is that you want to acknowledge the message received after that point. Is that happening here? (There is no disposition in that first log, but neither is there a session close or connection close, so from an AMQP perspective it is incomplete).
My suspicion is that the broker will consider any messages unacknowledged at the point the receiver is detached to be available for redelivery. In that case, sending an acknowledgement after the detach is not going to have any effect anyway. (I can't confirm this without knowing what if anything happens with the first process after the detach).
Regarding the other question, at present you can only successfully drain the credit if you have automatic credit management disabled. There is an example of that here: https://github.com/amqp/rhea/blob/main/examples/drain.js.
from rhea.
Ah yeah, I accidentally cut off some logging at the end. I recreated it, and these were the rhea logs in the service that was shutting down
The message is acked, then the queue is closed
[2023-10-16T15:35:50Z] DEBUG (alerts/1 on service-alerts-dbd9d8cc5-vzjn5): Closing queues
Mon, 16 Oct 2023 15:35:50 GMT rhea:frames [connection-1]:0 -> disposition#15 {"role":true,"settled":true,"state":[]}
Mon, 16 Oct 2023 15:35:50 GMT rhea:raw [connection-1] SENT: 28 0000001c02000000005315d00000000c000000054143434100532445
These are the logs from the second service that got the same message
Mon, 16 Oct 2023 15:35:59 GMT rhea:frames [connection-1]:0 -> disposition#15 {"role":true,"settled":true,"state":[]}
Mon, 16 Oct 2023 15:35:59 GMT rhea:raw [connection-1] SENT: 28 0000001c02000000005315d00000000c000000054143434100532445
Mon, 16 Oct 2023 15:35:59 GMT rhea:frames [connection-1]:0 -> flow#13 {"next_incoming_id":1,"incoming_window":2048,"outgoing_window":4294967295,"handle":2,"delivery_count":1,"link_credit":15}
Mon, 16 Oct 2023 15:35:59 GMT rhea:raw [connection-1] SENT: 39 0000002702000000005313d00000001700000007520170000008004370ffffffff52025201520f
Mon, 16 Oct 2023 15:36:15 GMT rhea:io [connection-1] read 8 bytes
Mon, 16 Oct 2023 15:36:15 GMT rhea:io [connection-1] got frame of size 8
Mon, 16 Oct 2023 15:36:15 GMT rhea:raw [connection-1] RECV: 8 0000000802000000
Mon, 16 Oct 2023 15:36:15 GMT rhea:frames [connection-1]:0 <- empty
Mon, 16 Oct 2023 15:36:21 GMT rhea:frames [connection-1]:0 -> empty
Mon, 16 Oct 2023 15:36:21 GMT rhea:raw [connection-1] SENT: 8 0000000802000000
It does look like this particular consumer is disabling automatic credit management, but not sure about the rest of the services. That is good to know!
Edit: Is it possible to mutate a consumer to disable automatic credit management, or is that only possible during consumer instantiation?
from rhea.
Ok, the extra logs confirm that the broker is redelivering any unacknowledged message at the point that the receiver is closed. So acknowledgements sent after that are ignored.
from rhea.
Is it possible to mutate a consumer to disable automatic credit management, or is that only possible during consumer instantiation?
from rhea.
At present you can only set that on creation of the receiver.
from rhea.
Related Issues (20)
- Regression in PR #382: Handle transfer with no payload HOT 1
- Receiver (Consumer) does not receive message that are larger than ~4.15kb HOT 3
- How can I access virtual host using rhea with TLS? HOT 15
- Does it supports OAUTH2 mechanism? HOT 2
- ulong correlation_id incorrecly serialized as uuid HOT 3
- Acknowledgments doesnt work HOT 6
- idle_time_out fires which event? HOT 15
- Publish Window Size Property HOT 2
- `NetClientConnectionOptions` and `TlsClientConnectionOptions` should be exported
- Wildcard Topics HOT 15
- loosing messages intermittently while consuming from Azure service bus HOT 1
- Is `absolute_expiry_time` in milliseconds? HOT 7
- amqp:resource-limit-exceeded ERROR HOT 1
- `sendable` event not fired after certain amount of messages HOT 1
- How to use filters dynamically ? HOT 12
- Regenerate or remove ssl certificates for examples
- Connection RabbitMQ through websocket HOT 2
- Sender appears to be ignoring max_frame_size HOT 4
- connection_detail does not support hostname HOT 5
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from rhea.