Git Product home page Git Product logo

Comments (6)

amarzavery avatar amarzavery commented on June 18, 2024

Okay, an update: This is what I found to work for me.

  • The receiver is created with credit_window set to 0. Thus no messages are received as soon as the receiver link is establishes (i.e. receiver_open event is emitted).
  • When receiver.add_credit(n) is executed as expected n messages are received.
  • Later I wanted rhea to manage credits for me on the same established receiver link, so I did the following 2 things:
    • receiver.set_credit_window(1000);
    • receiver.add_credit(1000) // setting the credit_window alone did not help. However as soon as I added credit after setting the credit window, then rhea started auto managing credits for me.

@grs - Am I doing this correctly for what I want to achieve? Do you foresee any side effects?

The reason for doing the above thing: I expose two methods on the receiver

  • receive(maxMessageCount, maxWaitTime): Promise<Messages[]> // This will receive specified amount of messages in the given time. I want rhea to receive only the specified number of messages in this scenario.
  • receiveWithHandlers(onMessage, onError): void // This will receive messages as long as the link is active or an error occurs while receiving messages. I need rhea to manage credits for me in this scenario.

I want to avoid creating receiver links if the user calls receive() multiple times and then calls receiveWithHandlers(). Playing with credits is much easier. Hence want to make sure if I am doing the right thing.

from rhea.

grs avatar grs commented on June 18, 2024

@amarzavery that looks ok, I can't see anything that would cause a problem.

Once the window is set you cannot at present disable it very easily however (I think that is ok in your use case though, right?).

The reason you need to add the credit is that the flow control sets credit (a) when a message arrives or (b) when the link is opened. So if you set the window after the link is opened it won't trigger until a message arrives and if there is no credit, that won't happen.

from rhea.

amarzavery avatar amarzavery commented on June 18, 2024

The reason you need to add the credit is that the flow control sets credit (a) when a message arrives or (b) when the link is opened. So if you set the window after the link is opened it won't trigger until a message arrives and if there is no credit, that won't happen.

I see thanks for the explanation.

Once the window is set you cannot at present disable it very easily however (I think that is ok in your use case though, right?).

Yes that is fine. In that case, I ask the customer to close the receiver and internally I close the receiver link.

Now I am seeing something weird while closing the receiver.

Scenario:

The receiver is busy receiving messages and some time later it decides that it wants to close itself.
With debug logs on, what i noticed was: Between closing the receiver and the close actually happening, close to ~1000 messages were received by rhea but were never sent to my app.

Here are the debug logs..

>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Closing receiver.
  rhea:frames [connection-1] PENDING: '{"channel":1,"type":0,"performative":[0,true]}' +12ms
  rhea:raw [connection-1] SENT: <Buffer 00 00 00 16 02 00 00 01 00 53 16 d0 00 00 00 06 00 00 00 02 43 41> +12ms
  rhea:io [connection-1] read 16355 bytes +94ms
  rhea:io [connection-1] got frame of size 562 +0ms
  rhea:frames [connection-1] RECV: '{"size":562,"type":0,"channel":1,"performative":{"type":"transfer#14","delivery_id":2022,"delivery_tag":{"type":"Buffer","data":[]},"settled":true,"batchable":true},"payload":{"type":"Buffer","data":[0,83,113,193,150,8,163,29,108,97,115,116,95,101,110,113,117,101,117,101,100,95,115,101,113,117,101,110,99,101,95,110,117,109,98,101,114,129,0,0,0,0,0,4,138,110,163,20,108,97,115,116,95,101,110,113,117,101,117,101,100,95,111,102,102,115,101,116,161,10,56,54,50,50,54,52,57,51,50,48,163,22,108,97,115,116,95,101,110,113,117,101,117,101,100,95,116,105,109,101,95,117,116,99,131,0,0,1,98,213,174,174,184,163,31,114,117,110,116,105,109,101,95,105,110,102,111,95,114,101,116,114,105,101,118,97,108,95,116,105,109,101,95,117,116,99,131,0,0,1,98,218,46,195,236,0,83,114,193,89,6,163,21,120,45,111,112,116,45,115,101,113,117,101,110,99,101,45,110,117,109,98,101,114,129,0,0,0,0,0,3,63,153,163,12,120,45,111,112,116,45,111,102,102,115,101,116,161,10,56,53,57,48,54,49,51,57,56,52,163,19,120,45,111,112,116,45,101,110,113,117,101,117,101,100,45,116,105,109,101,131,0,0,1,98,110,206,229,157,0,83,115,69,0,83,119,209,0,0,1,15,0,0,0,2,161,4,98,111,100,121,176,0,0,1,0,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90]}}' +82ms
  rhea:io [connection-1] got frame of size 562 +0ms
  rhea:frames [connection-1] RECV: '{"size":562,"type":0,"channel":1,"performative":{"type":"transfer#14","delivery_id":2023,"delivery_tag":{"type":"Buffer","data":[]},"settled":true,"batchable":true},"payload":{"type":"Buffer","data":[0,83,113,193,150,8,163,29,108,97,115,116,95,101,110,113,117,101,117,101,100,95,115,101,113,117,101,110,99,101,95,110,117,109,98,101,114,129,0,0,0,0,0,4,138,110,163,20,108,97,115,116,95,101,110,113,117,101,117,101,100,95,111,102,102,115,101,116,161,10,56,54,50,50,54,52,57,51,50,48,163,22,108,97,115,116,95,101,110,113,117,101,117,101,100,95,116,105,109,101,95,117,116,99,131,0,0,1,98,213,174,174,184,163,31,114,117,110,116,105,109,101,95,105,110,102,111,95,114,101,116,114,105,101,118,97,108,95,116,105,109,101,95,117,116,99,131,0,0,1,98,218,46,195,236,0,83,114,193,89,6,163,21,120,45,111,112,116,45,115,101,113,117,101,110,99,101,45,110,117,109,98,101,114,129,0,0,0,0,0,3,63,154,163,12,120,45,111,112,116,45,111,102,102,115,101,116,161,10,56,53,57,48,54,49,52,51,50,48,163,19,120,45,111,112,116,45,101,110,113,117,101,117,101,100,45,116,105,109,101,131,0,0,1,98,110,206,229,157,0,83,115,69,0,83,119,209,0,0,1,15,0,0,0,2,161,4,98,111,100,121,176,0,0,1,0,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90]}}' +1ms
  rhea:io [connection-1] got frame of size 562 +1ms

. . . . and many more like that...

  rhea:frames [connection-1] RECV: '{"size":562,"type":0,"channel":1,"performative":{"type":"transfer#14","delivery_id":3014,"delivery_tag":{"type":"Buffer","data":[]},"settled":true,"batchable":true},"payload":{"type":"Buffer","data":[0,83,113,193,150,8,163,29,108,97,115,116,95,101,110,113,117,101,117,101,100,95,115,101,113,117,101,110,99,101,95,110,117,109,98,101,114,129,0,0,0,0,0,4,138,110,163,20,108,97,115,116,95,101,110,113,117,101,117,101,100,95,111,102,102,115,101,116,161,10,56,54,50,50,54,52,57,51,50,48,163,22,108,97,115,116,95,101,110,113,117,101,117,101,100,95,116,105,109,101,95,117,116,99,131,0,0,1,98,213,174,174,184,163,31,114,117,110,116,105,109,101,95,105,110,102,111,95,114,101,116,114,105,101,118,97,108,95,116,105,109,101,95,117,116,99,131,0,0,1,98,218,46,205,145,0,83,114,193,89,6,163,21,120,45,111,112,116,45,115,101,113,117,101,110,99,101,45,110,117,109,98,101,114,129,0,0,0,0,0,3,67,121,163,12,120,45,111,112,116,45,111,102,102,115,101,116,161,10,56,53,57,48,57,52,55,50,57,54,163,19,120,45,111,112,116,45,101,110,113,117,101,117,101,100,45,116,105,109,101,131,0,0,1,98,110,206,230,26,0,83,115,69,0,83,119,209,0,0,1,15,0,0,0,2,161,4,98,111,100,121,176,0,0,1,0,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90,90]}}' +0ms
  rhea:io [connection-1] got frame of size 17 +0ms
>>  rhea:frames [connection-1] RECV: '{"size":17,"type":0,"channel":1,"performative":{"type":"detach#16","closed":true}}' +0ms
>>  rhea:events Link got event: receiver_close +3s
  rhea-promise Resolving the promise as the amqp receiver has been closed. +13s
  azure:event-hubs:receiver Deleted the receiver "5334b21b-b629-4e18-ae52-4c21c6de49e7" from the client cache. +12s
  azure:event-hubs:receiver [connection-1] Receiver "5334b21b-b629-4e18-ae52-4c21c6de49e7" has been closed. +0ms
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Closing client.
  rhea:frames [connection-1] PENDING: '{"channel":0,"type":0,"performative":[]}' +2ms
  rhea:raw [connection-1] SENT: <Buffer 00 00 00 0c 02 00 00 00 00 53 17 45> +3s
  rhea:io [connection-1] read 15 bytes +96ms
  rhea:io [connection-1] got frame of size 15 +0ms
  rhea:frames [connection-1] RECV: '{"size":15,"type":0,"channel":0,"performative":{"type":"end#17"}}' +94ms
  rhea:events Session got event: session_close +96ms
  rhea-promise Resolving the promise as the amqp session has been closed. +96ms
  azure:event-hubs:cbs Successfully closed the cbs session. +14s
  rhea:frames [connection-1] PENDING: '{"channel":0,"type":0,"performative":[]}' +2ms
  rhea:raw [connection-1] SENT: <Buffer 00 00 00 0c 02 00 00 00 00 53 18 45> +96ms
  rhea:io [connection-1] read 15 bytes +96ms
  rhea:io [connection-1] got frame of size 15 +1ms
  rhea:frames [connection-1] RECV: '{"size":15,"type":0,"channel":0,"performative":{"type":"close#18"}}' +95ms
  rhea:events Connection got event: connection_close +96ms
  rhea-promise Resolving the promise as the connection has been successfully closed. +97ms
  azure:event-hubs:client Closed the amqp connection "connection-1" on the client. +0ms
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Closed receiver and client...

For reference these are the options that I provided while creating the receiver:

rhea:events Link got event: receiver_open +639ms
  rhea-promise Resolving the promise with amqp receiver "5334b21b-b629-4e18-ae52-4c21c6de49e7". +638ms
  azure:event-hubs:receiver Promise to create the receiver resolved. Created receiver with name:  5334b21b-b629-4e18-ae52-4c21c6de49e7 +638ms
  azure:event-hubs:receiver [connection-1] Receiver '5334b21b-b629-4e18-ae52-4c21c6de49e7' created with receiver options: { name: '5334b21b-b629-4e18-ae52-4c21c6de49e7',
  azure:event-hubs:receiver   autoaccept: true,
  azure:event-hubs:receiver   source: { address: 'xxxxx/ConsumerGroups/$default/Partitions/0' },
  azure:event-hubs:receiver   credit_window: 0,
  azure:event-hubs:receiver   desired_capabilities: 'com.microsoft:enable-receiver-runtime-metric',
  azure:event-hubs:receiver   target: undefined } +0ms

from rhea.

grs avatar grs commented on June 18, 2024

At present messages are only dispatched if the receiver is open (meaning both local and remote). That should perhaps be changed to always deliver while remote is open. You can raise an issue for that and I'll get a fix in.

from rhea.

amarzavery avatar amarzavery commented on June 18, 2024

Cool. Here is the new issue: #56

from rhea.

amarzavery avatar amarzavery commented on June 18, 2024

closing this one as my question has been answered.

from rhea.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.