vcabbage / amqp Goto Github PK
View Code? Open in Web Editor NEWAMQP 1.0 client library for Go.
Home Page: https://godoc.org/pack.ag/amqp
License: MIT License
AMQP 1.0 client library for Go.
Home Page: https://godoc.org/pack.ag/amqp
License: MIT License
Lines 398 to 402 in ee6eb7e
I was hoping that properties could stay string values, but it appears strings will not suffice. This has come up due to epoch functionality described in #43. Epoch values can only be longs.
As discussed in #28 and #26, you didn't want to expose interface{}
as the value of a property, so we landed on exposing only string. Now that there is real need to set values other than string, how would you like to see LinkProperties implemented?
Lines 560 to 589 in ee6eb7e
Lines 823 to 825 in ee6eb7e
As reported in #46 (comment), errors returned in a rejected disposition are not propagated up to the sender.
I think this can be resolved by using a chan deliveryState
to confirm message settlement instead of chan struct{}
.
I'm not sure how this will work out with sender-settled messages. The best option in that case may be to consider it a link error.
I'm running a parallel benchmark sending to a queue similar to the following:
session, err := sb.client.NewSession() // client is *amqp.Client -- the client connection is reused
if err != nil {
return err
}
defer session.Close()
sender, err := session.NewSender(amqp.LinkAddress(entityPath))
if err != nil {
return err
}
defer sender.Close()
return sender.Send(ctx, msg)
Received the following:
Unexpected frame: &{*Error(nil)}
Unexpected frame: *performClose{Error: *Error{Condition: amqp:session:handle-in-use, Description: The handle '0' is already associated with object 'session55149196'., Info: map[]}}
The source of the error is:
Line 474 in a6e3307
Any thoughts on why?
The AMQPv1.0 spec section 2.7.1 indicates Offered-Capabilities is a list, however, when attempting to connect to AMQ7 Interconnect Router, the field arrives from AMQ7 router as 0xa3. The code attempts to read an ArrayHeader and fails.
Per the spec. I expected a list/array, which would mean the code is correct.
Per the AMQ7 Interconnect Router behavior I expect the Offered-Capabilities to be a symbol which the code does not handle.
I changed my local copy to expect symbol for offered-capabilities, and it works with AMQ7 Interconnect Router.
This might be a bug in AMQ7, not your code. I haven't try this with any other AMQPv1.0 deices.
Hi there!
I't not possible to build the lib on a 32bit architecture.
When GOARCH=arm
, GOARCH=386
it's failing with the following errors:
vendor/pack.ag/amqp/encode.go:394:9: constant 4294967295 overflows int
vendor/pack.ag/amqp/encode.go:419:9: constant 4294967295 overflows int
vendor/pack.ag/amqp/encode.go:531:11: constant 4294967291 overflows int
vendor/pack.ag/amqp/types.go:2436:9: constant 4294967295 overflows int
Related to: #35
After digging into send batching in Azure Event Hubs, I found the current implementation does not actually accomplish send batching as Event Hubs expects. Unfortunately, the structure that Event Hubs expects also can't be built easily via the publicly exposed structures.
Here's the Java code to implement this feature via Proton messages.
Basically, the first message is marshaled with an empty data body, then each data body is used to create an inner message, which is marshaled and added to a wrapper message. Finally, the wrapped messages are marshaled at the end of the first message.
I'll add a PR to better explain the process.
Line 274 in ccafaa7
One such example for non-default message formats is sending batch messages in Azure Service Bus. The following message format is used in the Java lib.
public static final int AMQP_BATCH_MESSAGE_FORMAT = 0x80013700; // 2147563264L;
Perhaps, send should support TransferOption
variadic arguments. Thoughts?
Lines 335 to 336 in ee6eb7e
Randomly in my CI pipeline I get panics like:
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x64 pc=0x69cbb2]
goroutine 189 [running]:
.../vendor/pack.ag/amqp.(*Message).shouldSendDisposition(...)
vendor/pack.ag/amqp/types.go:1741
.../vendor/pack.ag/amqp.(*Message).Accept(0x0)
.../olu-advance-connector/vendor/pack.ag/amqp/types.go:1720 +0x22
Revision: 156a96cbd71de6a80dd774d60de212f31c7272c7
If it's needed I can spend some time trying to narrow down the issue or you already have thoughts on this?
P.S. updated to the latest version, we'll see how it goes.
Lines 1382 to 1389 in ee6eb7e
Without setting the amqp.LinkMaxMessageSize(...)
on a receiver, the max message size is -1, which causes an error. After setting amqp.LinkMaxMessageSize(...)
I'm again able to receive messages.
Lines 863 to 869 in ee6eb7e
Is there the offset checkpoint(commit) feature supported yet?
Lines 186 to 188 in ee6eb7e
These values control the session level flow control and limit the number of in-flight transfer frames.
Lines 1362 to 1368 in ee6eb7e
As mentioned in #9 (comment), we may be improperly handing the amqps
scheme. Currently it enables TLS negotiation as part of AMQP negotiation. It may be that it should establish TLS before AMQP negotiation. If that's true, Dial()
should automatically create the initial TLS connection.
As mentioned in #42, Event Hubs has a feature where a receiver can specify an epoch. The epoch value must be a long (can't be a string, ulong or anything else) set in the attach performative properties.
The behavior of this broker feature is as follows:
In case 2, using a single connection between r1 and r2, I see both r1 and r2 close. I would expect only r2 to fail to attach and r1 would stay open, not close.
It appears the context for r1 is closed when r2 receives the detach error.
Excerpt from debug output for e1 = 4 and e2 = 1
TX: Begin{RemoteChannel: 0, NextOutgoingID: 0, IncomingWindow: 5000, OutgoingWindow: 4294967295, HandleMax: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
RX: Begin{RemoteChannel: 1, NextOutgoingID: 1, IncomingWindow: 5000, OutgoingWindow: 5000, HandleMax: 255, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
TX: Attach{Name: jgttgCWUNisofYoKKyeAaQcCvlqeprIhIoPOqOYt, Handle: 0, Role: Receiver, SenderSettleMode: unsettled, ReceiverSettleMode: second, Source: source{Address: goehtest-pjkavlk4cq/ConsumerGroups/$Default/Partitions/0, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset >= '-1'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 9223372036854775807, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[com.microsoft:epoch:4]}
RX(Session): Attach{Name: jgttgCWUNisofYoKKyeAaQcCvlqeprIhIoPOqOYt, Handle: 0, Role: Sender, SenderSettleMode: settled, ReceiverSettleMode: <nil>, Source: source{Address: goehtest-pjkavlk4cq/ConsumerGroups/$Default/Partitions/0, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset >= '-1'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 266240, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[com.microsoft:epoch:4]}
TX: Begin{RemoteChannel: 0, NextOutgoingID: 0, IncomingWindow: 5000, OutgoingWindow: 4294967295, HandleMax: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
RX: Begin{RemoteChannel: 3, NextOutgoingID: 1, IncomingWindow: 5000, OutgoingWindow: 5000, HandleMax: 255, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
TX: Attach{Name: gCodtOUPdHLKkVwARWZfXPvOfWmzGHPolHezofUZ, Handle: 0, Role: Receiver, SenderSettleMode: unsettled, ReceiverSettleMode: second, Source: source{Address: goehtest-pjkavlk4cq/ConsumerGroups/$Default/Partitions/0, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset >= '-1'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 9223372036854775807, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[com.microsoft:epoch:1]}
RX(Session): Attach{Name: gCodtOUPdHLKkVwARWZfXPvOfWmzGHPolHezofUZ, Handle: 0, Role: Sender, SenderSettleMode: unsettled, ReceiverSettleMode: second, Source: <nil>, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 266240, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[link-name:gCodtOUPdHLKkVwARWZfXPvOfWmzGHPolHezofUZ client-max-frame-size:480 com.microsoft:epoch:1]}
TX(Session): Detach{Handle: 0, Closed: true, Error: *Error(nil)}
TX(Session): Flow{NextIncomingID: 1, IncomingWindow: 5000, NextOutgoingID: 0, OutgoingWindow: 4294967295, Handle: 0, DeliveryCount: 0, LinkCredit: 100, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
ERRO[0041] entity path goehtest-pjkavlk4cq/ConsumerGroups/$Default/Partitions/0 and epoch 4 error: context canceled
RX(Session): Detach{Handle: 0, Closed: true, Error: *Error{Condition: amqp:link:stolen, Description: A receiver with a higher epoch '4' already exists. A new receiver with epoch 1 cannot be created. Make sure you are creating receiver with increasing epoch value to ensure connectivity, or ensure all old epoch receivers are closed or disconnected. TrackingId:8a804a40-52f5-48b3-ada9-08eb219f01b8_B25, SystemTracker:ehdjtest:eventhub:goehtest-pjkavlk4cq~8191, Timestamp:2/28/2018 3:41:45 PM Reference:5569318e-5e9b-441a-b780-88b73dcc55b0, TrackingId:2078bf14-76e7-437c-a7fa-4e15e478e0cd_B25, SystemTracker:ehdjtest:eventhub:goehtest-pjkavlk4cq~8191|$default, Timestamp:2/28/2018 3:41:45 PM TrackingId:a23d3b1f30694314ba5b70b64ce53de3_G26, SystemTracker:gateway2, Timestamp:2/28/2018 3:41:45 PM, Info: map[]}}
RX: Detach{Handle: 0, Closed: true, Error: *Error{Condition: amqp:link:stolen, Description: A receiver with a higher epoch '4' already exists. A new receiver with epoch 1 cannot be created. Make sure you are creating receiver with increasing epoch value to ensure connectivity, or ensure all old epoch receivers are closed or disconnected. TrackingId:8a804a40-52f5-48b3-ada9-08eb219f01b8_B25, SystemTracker:ehdjtest:eventhub:goehtest-pjkavlk4cq~8191, Timestamp:2/28/2018 3:41:45 PM Reference:5569318e-5e9b-441a-b780-88b73dcc55b0, TrackingId:2078bf14-76e7-437c-a7fa-4e15e478e0cd_B25, SystemTracker:ehdjtest:eventhub:goehtest-pjkavlk4cq~8191|$default, Timestamp:2/28/2018 3:41:45 PM TrackingId:a23d3b1f30694314ba5b70b64ce53de3_G26, SystemTracker:gateway2, Timestamp:2/28/2018 3:41:45 PM, Info: map[]}}
TX(Session): Detach{Handle: 0, Closed: true, Error: *Error(nil)}
RX(Session): Detach{Handle: 0, Closed: true, Error: *Error(nil)}
ERRO[0041] entity path goehtest-pjkavlk4cq/ConsumerGroups/$Default/Partitions/0 and epoch 1 error: link detached, reason: *Error{Condition: amqp:link:stolen, Description: A receiver with a higher epoch '4' already exists. A new receiver with epoch 1 cannot be created. Make sure you are creating receiver with increasing epoch value to ensure connectivity, or ensure all old epoch receivers are closed or disconnected. TrackingId:8a804a40-52f5-48b3-ada9-08eb219f01b8_B25, SystemTracker:ehdjtest:eventhub:goehtest-pjkavlk4cq~8191, Timestamp:2/28/2018 3:41:45 PM Reference:5569318e-5e9b-441a-b780-88b73dcc55b0, TrackingId:2078bf14-76e7-437c-a7fa-4e15e478e0cd_B25, SystemTracker:ehdjtest:eventhub:goehtest-pjkavlk4cq~8191|$default, Timestamp:2/28/2018 3:41:45 PM TrackingId:a23d3b1f30694314ba5b70b64ce53de3_G26, SystemTracker:gateway2, Timestamp:2/28/2018 3:41:45 PM, Info: map[]}
RX(Session): Detach{Handle: 0, Closed: true, Error: *Error{Condition: amqp:link:detach-forced, Description: The link 'G26:29035311:opkxrbrNvsKWErgDdrEDiUshYNcwdYkxOzzsFHTg' is force detached by the broker due to errors occurred in publisher(link5075554). Detach origin: Message sender was closed because entity has been deleted.. TrackingId:bf8e03460000a344004d72625a96cdba_G26_B25, SystemTracker:ehdjtest:eventhub:goehtest-pjkavlk4cq~8191, Timestamp:2/28/2018 3:41:51 PM, Info: map[]}}
RX: Detach{Handle: 0, Closed: true, Error: *Error{Condition: amqp:link:detach-forced, Description: The link 'G26:29035311:opkxrbrNvsKWErgDdrEDiUshYNcwdYkxOzzsFHTg' is force detached by the broker due to errors occurred in publisher(link5075554). Detach origin: Message sender was closed because entity has been deleted.. TrackingId:bf8e03460000a344004d72625a96cdba_G26_B25, SystemTracker:ehdjtest:eventhub:goehtest-pjkavlk4cq~8191, Timestamp:2/28/2018 3:41:51 PM, Info: map[]}}
TX(Session): Detach{Handle: 0, Closed: true, Error: *Error(nil)}
Lines 412 to 418 in ee6eb7e
On the open performative, field properties
provides a nice way to add user-agent and other helpful information. Unfortunately, there is no way to get at this internal map at this point.
ConnOption
?I have an application that uses the Send method to send a json encoded byte array and it seems to be dropping messages when the message data is over 438 bytes. Is there some flag I need to set for longer strings?
I'm using Azure Event Hubs (which shows no messages received when I only send messages over 438 bytes)
serializedEvent, err := eh.codec.Encode(eh.index, &event.Content)
if err != nil {
if !event.Guaranteed() {
return false
}
logp.Critical("Unable to encode event: %v", err)
return false
}
session, err := eh.client.NewSession()
if err != nil {
logp.Critical("Creating AMQP session: %v", err)
return false
}
sender, err := session.NewSender(amqp.LinkTargetAddress("/" + eh.queue))
if err != nil {
logp.Critical("Creating sender link: %v", err)
return false
}
//message := amqp.NewMessage(serializedEvent)
message := &amqp.Message{
Data: [][]byte{serializedEvent[:438], serializedEvent[438:]},
}
logp.Info("Sending message: %v", message)
err = sender.Send(context.Background(), message)
defer sender.Close()
if err != nil {
logp.Critical("Sending message: %v", err)
return false
}
return true
Lines 462 to 468 in ee6eb7e
Current implementation wastes several bytes on each transfer.
Lines 1545 to 1575 in ee6eb7e
It's needed at least to implement request-response operations (reply-to):
https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-request-response
Right now it's not possible:
Lines 692 to 705 in 746c1d6
Probably we should add funcs like LinkSource(s *Source)
and LinkTarget(t *Target)
Any thoughts?
Currently it's possible to call accept/modify/reject/release multiple times on the same message and have a disposition sent multiple times. These are all terminal delivery states and only the first call is valid.
I think *bytes.Buffer
is the only type currently passed. Changing from an interface gives up some flexibility, but it allows for better inlining and escape analysis decisions. Need to benchmark to see whether the change is justified.
Lines 34 to 35 in ee6eb7e
conn.go line 434 _ = c.net.SetReadDeadline(time.Now().Add(c.idleTimeout))
,
should change it to
if c.idleTimeout > 0 { _ = c.net.SetReadDeadline(time.Now().Add(c.idleTimeout)) }
Integration tests frequently fail due to context or leakcheck timeouts. Should bump up the timeouts or add a scaling option to use in CI.
Unsure how widely used sequences are. Low on the priority list unless someone asks for them.
Lines 1710 to 1712 in ee6eb7e
Should write table tests for ConnOption
and LinkOption
combinations.
Go has no builtin decimal types, would need to determine a reasonable way to represent these without pulling in a 3rd party lib.
Char is UTF-32BE encoded. Ideally this would be converted into a rune
.
Lines 658 to 666 in ee6eb7e
Right now it's failing to unmarshal this kind of messages, probably it's needed to review how encoding-decoding is done from go to amqp data types (and visa versa).
Line 1783 in ee6eb7e
Hello,
I try to get this package work with RabbitMQ and the AMQP 1 plugin. I can get it work with Go and Azure or C# and RabbitMQ but not with Go and RabbitMQ. Here is my code
package main
import (
"context"
"time"
log "github.com/sirupsen/logrus"
"pack.ag/amqp"
)
const amqpURL = "amqp://xxxxx:xxxxx@localhost:5672/"
func main() {
// Create client
client, err := amqp.Dial(amqpURL)
ctx := context.Background()
if err != nil {
log.Fatal(err)
}
// Open a session
session, err := client.NewSession()
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
// Create a sender
sender, err := session.NewSender(
amqp.LinkTargetAddress("/queue-name"),
)
if err != nil {
log.Fatal("Creating sender link:", err)
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
// Send message
msg := "Hello!"
err = sender.Send(ctx, amqp.NewMessage([]byte(msg)))
if err != nil {
log.Fatal("Sending message:", err)
}
log.Info("sent %s", msg)
cancel()
sender.Close()
}
The code is blocked at session.NewSender
and moreover I can't see the connection in RabbitMQ admin.
Did I miss something ?
Best regard
Line 1004 in ee6eb7e
AMQP Claims-based Security: Defines a layered protocol to authenticate AMQP clients using security tokens (working draft).
This would enable a consumer of the library to provide a set of claims for fine grained access control, a token with a TTL and a token type to negotiate a connection via SASL.
This is based on the AMQP management specification, also in working draft.
One example implementor is Azure Service Bus using either SAS or JWT tokens. A good example of this is AMQPNetLite CBS example.
When dialing an URL we should handle the username/password segment as SASL credentials e.g. amqp://user:[email protected]
When sending to Event Hubs a tt.data
consisting of the following.
{
label: "1 roundtrip, large payload",
data: []string{strings.Repeat("H", 133793)},
},
sender.Send
will timeout.
Debug test output is as follows:
=== RUN TestIntegration_EventHubs_RoundTrip
=== RUN TestIntegration_EventHubs_RoundTrip/1_roundtrip,_large_payload
TX: Begin{RemoteChannel: 0, NextOutgoingID: 0, IncomingWindow: 5000, OutgoingWindow: 4294967295, HandleMax: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
RX: Begin{RemoteChannel: 0, NextOutgoingID: 1, IncomingWindow: 5000, OutgoingWindow: 5000, HandleMax: 255, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
TX: Attach{Name: jdZNvYUpEdoRxgCkhrpuvhvFmEckUNzOGxwWFUMj, Handle: 0, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: integration-receive-10546816273958410203/ConsumerGroups/$default/Partitions/0, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset > '@latest'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 9223372036854775807, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
RX(Session): Attach{Name: jdZNvYUpEdoRxgCkhrpuvhvFmEckUNzOGxwWFUMj, Handle: 0, Role: Sender, SenderSettleMode: settled, ReceiverSettleMode: <nil>, Source: source{Address: integration-receive-10546816273958410203/ConsumerGroups/$default/Partitions/0, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset > '@latest'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 266240, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
RX: Attach{Name: jdZNvYUpEdoRxgCkhrpuvhvFmEckUNzOGxwWFUMj, Handle: 0, Role: Sender, SenderSettleMode: settled, ReceiverSettleMode: <nil>, Source: source{Address: integration-receive-10546816273958410203/ConsumerGroups/$default/Partitions/0, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset > '@latest'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 266240, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
TX: Attach{Name: jIjqTJyvSVpFHCuxjShrflViQiaPDXWHngvyyaUk, Handle: 1, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: integration-receive-10546816273958410203/ConsumerGroups/$default/Partitions/1, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset > '@latest'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 9223372036854775807, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
TX: Flow{NextIncomingID: <nil>, IncomingWindow: 0, NextOutgoingID: 0, OutgoingWindow: 0, Handle: 0, DeliveryCount: 0, LinkCredit: 10, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
TX(Session): Flow{NextIncomingID: 1, IncomingWindow: 5000, NextOutgoingID: 0, OutgoingWindow: 4294967295, Handle: 0, DeliveryCount: 0, LinkCredit: 10, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
RX(Session): Attach{Name: jIjqTJyvSVpFHCuxjShrflViQiaPDXWHngvyyaUk, Handle: 1, Role: Sender, SenderSettleMode: settled, ReceiverSettleMode: <nil>, Source: source{Address: integration-receive-10546816273958410203/ConsumerGroups/$default/Partitions/1, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset > '@latest'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 266240, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
RX: Attach{Name: jIjqTJyvSVpFHCuxjShrflViQiaPDXWHngvyyaUk, Handle: 1, Role: Sender, SenderSettleMode: settled, ReceiverSettleMode: <nil>, Source: source{Address: integration-receive-10546816273958410203/ConsumerGroups/$default/Partitions/1, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset > '@latest'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 266240, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
TX: Attach{Name: KeVnbUGZSTHtZfxPrtGMUaCzXFAdqOAYgmXfsdBa, Handle: 2, Role: Sender, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: <nil>, Target: source{Address: integration-receive-10546816273958410203, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 9223372036854775807, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
TX: Flow{NextIncomingID: <nil>, IncomingWindow: 0, NextOutgoingID: 0, OutgoingWindow: 0, Handle: 1, DeliveryCount: 0, LinkCredit: 10, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
TX(Session): Flow{NextIncomingID: 1, IncomingWindow: 5000, NextOutgoingID: 0, OutgoingWindow: 4294967295, Handle: 1, DeliveryCount: 0, LinkCredit: 10, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
RX(Session): Attach{Name: KeVnbUGZSTHtZfxPrtGMUaCzXFAdqOAYgmXfsdBa, Handle: 2, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: <nil>, Target: source{Address: integration-receive-10546816273958410203, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 262144, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[com.microsoft:tracking-id:d8cd0fd33424440498f2d30ac379211e_G5]}
RX: Attach{Name: KeVnbUGZSTHtZfxPrtGMUaCzXFAdqOAYgmXfsdBa, Handle: 2, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: <nil>, Target: source{Address: integration-receive-10546816273958410203, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 262144, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[com.microsoft:tracking-id:d8cd0fd33424440498f2d30ac379211e_G5]}
RX(Session): Flow{NextIncomingID: 0, IncomingWindow: 5000, NextOutgoingID: 1, OutgoingWindow: 5000, Handle: 2, DeliveryCount: 0, LinkCredit: 300, Available: 0, Drain: false, Echo: false, Properties: map[]}
RX: Flow{NextIncomingID: 0, IncomingWindow: 5000, NextOutgoingID: 1, OutgoingWindow: 5000, Handle: 2, DeliveryCount: 0, LinkCredit: 300, Available: 0, Drain: false, Echo: false, Properties: map[]}
TX(link): Transfer{Handle: 2, DeliveryID: 1, DeliveryTag: "\x00\x00\x00\x00\x00\x00\x00\x00", MessageFormat: 0, Settled: false, More: true, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 446}
TX(link): Transfer{Handle: 2, DeliveryID: <nil>, DeliveryTag: "<nil>", MessageFormat: <nil>, Settled: false, More: true, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 446}
TX(Session): Transfer{Handle: 2, DeliveryID: 1, DeliveryTag: "\x00\x00\x00\x00\x00\x00\x00\x00", MessageFormat: 0, Settled: false, More: true, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 446}
TX(Session): Transfer{Handle: 2, DeliveryID: <nil>, DeliveryTag: "<nil>", MessageFormat: <nil>, Settled: false, More: true, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 446}
...
TX(Session): Transfer{Handle: 2, DeliveryID: <nil>, DeliveryTag: "<nil>", MessageFormat: <nil>, Settled: false, More: true, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 446}
RX(Session): Disposition{Role: Receiver, First: 1, Last: <nil>, Settled: true, State: Rejected{Error: *Error{Condition: amqp:link:detach-forced, Description: The link 'G5:Send:67112:636576975061600000:F0:C41795' is force detached by the broker due to errors occurred in publisher(link24215811). Detach origin: Message sender was closed because entity has been deleted.. TrackingId:c8bb45460000a343017181035ab969fa_G5_B29, SystemTracker:ehdjtest:eventhub:integration-receive-10546816273958410203~16383, Timestamp:3/26/2018 9:45:55 PM, Info: map[]}}, Batchable: false}
RX: Disposition{Role: Receiver, First: 1, Last: <nil>, Settled: true, State: Rejected{Error: *Error{Condition: amqp:link:detach-forced, Description: The link 'G5:Send:67112:636576975061600000:F0:C41795' is force detached by the broker due to errors occurred in publisher(link24215811). Detach origin: Message sender was closed because entity has been deleted.. TrackingId:c8bb45460000a343017181035ab969fa_G5_B29, SystemTracker:ehdjtest:eventhub:integration-receive-10546816273958410203~16383, Timestamp:3/26/2018 9:45:55 PM, Info: map[]}}, Batchable: false}
TX(Session): Detach{Handle: 2, Closed: true, Error: *Error(nil)}
RX(Session): Detach{Handle: 0, Closed: true, Error: *Error{Condition: amqp:link:detach-forced, Description: The link 'G5:54217517:jdZNvYUpEdoRxgCkhrpuvhvFmEckUNzOGxwWFUMj' is force detached by the broker due to errors occurred in consumer(link24215784). Detach origin: InnerMessageReceiver was closed. TrackingId:c8bb45460000a343017180e85ab969fa_G5_B29, SystemTracker:ehdjtest:eventhub:integration-receive-10546816273958410203~16383|$default, Timestamp:3/26/2018 9:45:55 PM, Info: map[]}}
RX(Session): Detach{Handle: 1, Closed: true, Error: *Error{Condition: amqp:link:detach-forced, Description: The link 'G5:54217542:jIjqTJyvSVpFHCuxjShrflViQiaPDXWHngvyyaUk' is force detached by the broker due to errors occurred in consumer(link21953275). Detach origin: InnerMessageReceiver was closed. TrackingId:174685f60000a344014efafb5ab969fa_G5_B8, SystemTracker:ehdjtest:eventhub:integration-receive-10546816273958410203~32766|$default, Timestamp:3/26/2018 9:45:55 PM, Info: map[]}}
RX: Detach{Handle: 0, Closed: true, Error: *Error{Condition: amqp:link:detach-forced, Description: The link 'G5:54217517:jdZNvYUpEdoRxgCkhrpuvhvFmEckUNzOGxwWFUMj' is force detached by the broker due to errors occurred in consumer(link24215784). Detach origin: InnerMessageReceiver was closed. TrackingId:c8bb45460000a343017180e85ab969fa_G5_B29, SystemTracker:ehdjtest:eventhub:integration-receive-10546816273958410203~16383|$default, Timestamp:3/26/2018 9:45:55 PM, Info: map[]}}
RX: Detach{Handle: 1, Closed: true, Error: *Error{Condition: amqp:link:detach-forced, Description: The link 'G5:54217542:jIjqTJyvSVpFHCuxjShrflViQiaPDXWHngvyyaUk' is force detached by the broker due to errors occurred in consumer(link21953275). Detach origin: InnerMessageReceiver was closed. TrackingId:174685f60000a344014efafb5ab969fa_G5_B8, SystemTracker:ehdjtest:eventhub:integration-receive-10546816273958410203~32766|$default, Timestamp:3/26/2018 9:45:55 PM, Info: map[]}}
TX(Session): Detach{Handle: 0, Closed: true, Error: *Error(nil)}
TX(Session): Detach{Handle: 1, Closed: true, Error: *Error(nil)}
RX(Session): Detach{Handle: 2, Closed: true, Error: *Error(nil)}
--- FAIL: TestIntegration_EventHubs_RoundTrip (52.68s)
integration_test.go:764: Creating hub integration-receive-10546816273958410203
--- FAIL: TestIntegration_EventHubs_RoundTrip/1_roundtrip,_large_payload (6.30s)
integration_test.go:453: Error after 0 sends: context deadline exceeded
FAIL
exit status 1
FAIL pack.ag/amqp 52.711s
I have a service which has two instances v1
and v2
. Now since it's the same service with two different instances, what I would like to achieve is that both load balance on the queue they are subscribed to.
For e.g. if the queue has messages 1, 2, 3, 4, 5, ...
the queues should get following in the order of
v1 -> 1, 3, 5, ...
v2 -> 2, 4, ...
Now if the v2
instance goes down and isn't available any more then v1
should start getting the following messages.
if the state of queue is ..., 12, 13, 14, 15, 16, ...
when message 14
arrives on queue and v2
has failed.
v1 -> .., 13, 14, 15, 16, ...
v2 -> 12, xxx
How do I achieve this with this library? So from the documentation what I have learnt is that you can achieve this with something called message grouping.
I have tried multiple things like amqp.LinkSessionFilter
, amqp.LinkProperty
, amqp.ConnProperty
, etc.
What is the right way to achieve this, any help and pointers will be helpful.
Lines 789 to 792 in ee6eb7e
Per discussion in #35, multiple data section support would be helpful for some use cases.
The implementation shouldn't be too difficult, but I'm not sure how the API should look.
Though not a firm requirement, I'd like to leave Message.Data
as is. I suspect a single data payload is the most common case.
Idea A: Add an AdditionalData [][]byte
field.
Message.Data
and Message.AdditionalData
, which isn't particularly ergonomic but I don't think it's terrible either.Idea B: Add MultiData [][]byte
, where the first element is the same same as Message.Data
.
Message.Data
or Message.MultiData
instead of both.Message.Data
and Message.MultiData
where the first MultiData
element differs from Data
. The API would likely end up asymmetric.Idea C: A variation on the above MultiData [][]byte
approach could be to only set Data
or MultiData
.
After rubber-ducking in this issue, I'm favoring Idea C. Other input is welcome.
/cc @devigned
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.