Git Product home page Git Product logo

amqp's People

Contributors

alanconway avatar amenzhinsky avatar carlosmaersk avatar devigned avatar hanvanderveen avatar k-wall avatar lawrencegripper avatar marstr avatar spacepille avatar vcabbage avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

amqp's Issues

LinkProperties other than string

I was hoping that properties could stay string values, but it appears strings will not suffice. This has come up due to epoch functionality described in #43. Epoch values can only be longs.

As discussed in #28 and #26, you didn't want to expose interface{} as the value of a property, so we landed on exposing only string. Now that there is real need to set values other than string, how would you like to see LinkProperties implemented?

Cleanup deliveryState or remove TODO

amqp/types.go

Lines 560 to 589 in ee6eb7e

type deliveryState interface{} // TODO: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transactions-v1.0-os.html#type-declared
type unsettled map[string]deliveryState
func (u unsettled) marshal(wr *buffer) error {
return writeMap(wr, u)
}
func (u *unsettled) unmarshal(r *buffer) error {
count, err := readMapHeader(r)
if err != nil {
return err
}
m := make(unsettled, count/2)
for i := uint32(0); i < count; i += 2 {
key, err := readString(r)
if err != nil {
return err
}
var value deliveryState
err = unmarshal(r, &value)
if err != nil {
return err
}
m[key] = value
}
*u = m
return nil
}

Replace randBytes or remove TODO comment

amqp/client.go

Lines 223 to 230 in ee6eb7e

func randBytes(n int) []byte { // TODO: random string gen off SO, replace
var letterBytes = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[rand.Int63()%int64(len(letterBytes))]
}
return b
}

Used at:

amqp/client.go

Lines 827 to 829 in ee6eb7e

func newLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
l := &link{
name: string(randBytes(40)),

amqp/conn.go

Lines 754 to 760 in ee6eb7e

// openAMQP round trips the AMQP open performative
func (c *conn) openAMQP() stateFunc {
// send open frame
c.err = c.writeFrame(frame{
type_: frameTypeAMQP,
body: &performOpen{
ContainerID: string(randBytes(40)),

Errors returned in rejected dispositions are silently ignored

As reported in #46 (comment), errors returned in a rejected disposition are not propagated up to the sender.

I think this can be resolved by using a chan deliveryState to confirm message settlement instead of chan struct{}.

I'm not sure how this will work out with sender-settled messages. The best option in that case may be to consider it a link error.

"Unexpected Frame" send during parallel benchmark

I'm running a parallel benchmark sending to a queue similar to the following:

session, err := sb.client.NewSession() // client is *amqp.Client -- the client connection is reused
if err != nil {
	return err
}
defer session.Close()

sender, err := session.NewSender(amqp.LinkAddress(entityPath))
if err != nil {
	return err
}
defer sender.Close()

return sender.Send(ctx, msg)

Received the following:

Unexpected frame: &{*Error(nil)}
Unexpected frame: *performClose{Error: *Error{Condition: amqp:session:handle-in-use, Description: The handle '0' is already associated with object 'session55149196'., Info: map[]}}

The source of the error is:

amqp/client.go

Line 474 in a6e3307

fmt.Printf("Unexpected frame: %s\n", body)

Any thoughts on why?

Does not handle Offered-Capabilities field in Open performative from AMQ7

The AMQPv1.0 spec section 2.7.1 indicates Offered-Capabilities is a list, however, when attempting to connect to AMQ7 Interconnect Router, the field arrives from AMQ7 router as 0xa3. The code attempts to read an ArrayHeader and fails.

Per the spec. I expected a list/array, which would mean the code is correct.
Per the AMQ7 Interconnect Router behavior I expect the Offered-Capabilities to be a symbol which the code does not handle.

I changed my local copy to expect symbol for offered-capabilities, and it works with AMQ7 Interconnect Router.

This might be a bug in AMQ7, not your code. I haven't try this with any other AMQPv1.0 deices.

Fix 32bit systems support

Hi there!
I't not possible to build the lib on a 32bit architecture.

When GOARCH=arm, GOARCH=386 it's failing with the following errors:

vendor/pack.ag/amqp/encode.go:394:9: constant 4294967295 overflows int
vendor/pack.ag/amqp/encode.go:419:9: constant 4294967295 overflows int
vendor/pack.ag/amqp/encode.go:531:11: constant 4294967291 overflows int
vendor/pack.ag/amqp/types.go:2436:9: constant 4294967295 overflows int

Send Message Batching

Related to: #35

After digging into send batching in Azure Event Hubs, I found the current implementation does not actually accomplish send batching as Event Hubs expects. Unfortunately, the structure that Event Hubs expects also can't be built easily via the publicly exposed structures.

Here's the Java code to implement this feature via Proton messages.

Basically, the first message is marshaled with an empty data body, then each data body is used to create an inner message, which is marshaled and added to a wrapper message. Finally, the wrapped messages are marshaled at the end of the first message.

I'll add a PR to better explain the process.

support for other message formats other than the default uint32(0)

amqp/client.go

Line 274 in ccafaa7

messageFormat = uint32(0) // Only message-format "0" is defined in spec.

One such example for non-default message formats is sending batch messages in Azure Service Bus. The following message format is used in the Java lib.

public static final int AMQP_BATCH_MESSAGE_FORMAT = 0x80013700; // 2147563264L;

via here

Perhaps, send should support TransferOption variadic arguments. Thoughts?

nil pointer dereference

Randomly in my CI pipeline I get panics like:

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x64 pc=0x69cbb2]

goroutine 189 [running]:
.../vendor/pack.ag/amqp.(*Message).shouldSendDisposition(...)
	vendor/pack.ag/amqp/types.go:1741
.../vendor/pack.ag/amqp.(*Message).Accept(0x0)
	.../olu-advance-connector/vendor/pack.ag/amqp/types.go:1720 +0x22

Revision: 156a96cbd71de6a80dd774d60de212f31c7272c7
If it's needed I can spend some time trying to narrow down the issue or you already have thoughts on this?

P.S. updated to the latest version, we'll see how it goes.

Block on message disposition when rcv-settle-mode is second

amqp/client.go

Lines 1382 to 1389 in ee6eb7e

// TODO:
// When rcv-settle-mode == second, don't consider the transfer complete
// until caller accepts/reject/etc and a confirm from sender.
//
// At first glance, this appears to be at odds with batching. A batch can't
// be built up if the caller is blocked on confirmation. However, while receives
// must happen synchronously, confirmations do not. While the use is probably
// limited it may be worth exploring.

received message larger than max size of -1

Without setting the amqp.LinkMaxMessageSize(...) on a receiver, the max message size is -1, which causes an error. After setting amqp.LinkMaxMessageSize(...) I'm again able to receive messages.

Investigate proper handling of `amqps` scheme

As mentioned in #9 (comment), we may be improperly handing the amqps scheme. Currently it enables TLS negotiation as part of AMQP negotiation. It may be that it should establish TLS before AMQP negotiation. If that's true, Dial() should automatically create the initial TLS connection.

Epoch receivers -- amqp:link:stolen detach error causing close of other link

As mentioned in #42, Event Hubs has a feature where a receiver can specify an epoch. The epoch value must be a long (can't be a string, ulong or anything else) set in the attach performative properties.

The behavior of this broker feature is as follows:

  1. if a receiver, r1 is attached with epoch e1, and then subsequently receiver r2 is attached with epoch e2 where e2 > e1, r1 will be detached by the server.
  2. if a receiver, r1 is attached with epoch e1, and then subsequently receiver r2 is attached with epoch e2 where e2 < e1, r2 will not be allowed to attach.

In case 2, using a single connection between r1 and r2, I see both r1 and r2 close. I would expect only r2 to fail to attach and r1 would stay open, not close.

It appears the context for r1 is closed when r2 receives the detach error.

Excerpt from debug output for e1 = 4 and e2 = 1

TX: Begin{RemoteChannel: 0, NextOutgoingID: 0, IncomingWindow: 5000, OutgoingWindow: 4294967295, HandleMax: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
RX: Begin{RemoteChannel: 1, NextOutgoingID: 1, IncomingWindow: 5000, OutgoingWindow: 5000, HandleMax: 255, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
TX: Attach{Name: jgttgCWUNisofYoKKyeAaQcCvlqeprIhIoPOqOYt, Handle: 0, Role: Receiver, SenderSettleMode: unsettled, ReceiverSettleMode: second, Source: source{Address: goehtest-pjkavlk4cq/ConsumerGroups/$Default/Partitions/0, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset >= '-1'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 9223372036854775807, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[com.microsoft:epoch:4]}
RX(Session): Attach{Name: jgttgCWUNisofYoKKyeAaQcCvlqeprIhIoPOqOYt, Handle: 0, Role: Sender, SenderSettleMode: settled, ReceiverSettleMode: <nil>, Source: source{Address: goehtest-pjkavlk4cq/ConsumerGroups/$Default/Partitions/0, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset >= '-1'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 266240, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[com.microsoft:epoch:4]}

TX: Begin{RemoteChannel: 0, NextOutgoingID: 0, IncomingWindow: 5000, OutgoingWindow: 4294967295, HandleMax: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
RX: Begin{RemoteChannel: 3, NextOutgoingID: 1, IncomingWindow: 5000, OutgoingWindow: 5000, HandleMax: 255, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
TX: Attach{Name: gCodtOUPdHLKkVwARWZfXPvOfWmzGHPolHezofUZ, Handle: 0, Role: Receiver, SenderSettleMode: unsettled, ReceiverSettleMode: second, Source: source{Address: goehtest-pjkavlk4cq/ConsumerGroups/$Default/Partitions/0, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset >= '-1'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 9223372036854775807, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[com.microsoft:epoch:1]}
RX(Session): Attach{Name: gCodtOUPdHLKkVwARWZfXPvOfWmzGHPolHezofUZ, Handle: 0, Role: Sender, SenderSettleMode: unsettled, ReceiverSettleMode: second, Source: <nil>, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 266240, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[link-name:gCodtOUPdHLKkVwARWZfXPvOfWmzGHPolHezofUZ client-max-frame-size:480 com.microsoft:epoch:1]}
TX(Session): Detach{Handle: 0, Closed: true, Error: *Error(nil)}
TX(Session): Flow{NextIncomingID: 1, IncomingWindow: 5000, NextOutgoingID: 0, OutgoingWindow: 4294967295, Handle: 0, DeliveryCount: 0, LinkCredit: 100, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
ERRO[0041] entity path goehtest-pjkavlk4cq/ConsumerGroups/$Default/Partitions/0 and epoch 4 error: context canceled

RX(Session): Detach{Handle: 0, Closed: true, Error: *Error{Condition: amqp:link:stolen, Description: A receiver with a higher epoch '4' already exists. A new receiver with epoch 1 cannot be created. Make sure you are creating receiver with increasing epoch value to ensure connectivity, or ensure all old epoch receivers are closed or disconnected. TrackingId:8a804a40-52f5-48b3-ada9-08eb219f01b8_B25, SystemTracker:ehdjtest:eventhub:goehtest-pjkavlk4cq~8191, Timestamp:2/28/2018 3:41:45 PM Reference:5569318e-5e9b-441a-b780-88b73dcc55b0, TrackingId:2078bf14-76e7-437c-a7fa-4e15e478e0cd_B25, SystemTracker:ehdjtest:eventhub:goehtest-pjkavlk4cq~8191|$default, Timestamp:2/28/2018 3:41:45 PM TrackingId:a23d3b1f30694314ba5b70b64ce53de3_G26, SystemTracker:gateway2, Timestamp:2/28/2018 3:41:45 PM, Info: map[]}}
RX: Detach{Handle: 0, Closed: true, Error: *Error{Condition: amqp:link:stolen, Description: A receiver with a higher epoch '4' already exists. A new receiver with epoch 1 cannot be created. Make sure you are creating receiver with increasing epoch value to ensure connectivity, or ensure all old epoch receivers are closed or disconnected. TrackingId:8a804a40-52f5-48b3-ada9-08eb219f01b8_B25, SystemTracker:ehdjtest:eventhub:goehtest-pjkavlk4cq~8191, Timestamp:2/28/2018 3:41:45 PM Reference:5569318e-5e9b-441a-b780-88b73dcc55b0, TrackingId:2078bf14-76e7-437c-a7fa-4e15e478e0cd_B25, SystemTracker:ehdjtest:eventhub:goehtest-pjkavlk4cq~8191|$default, Timestamp:2/28/2018 3:41:45 PM TrackingId:a23d3b1f30694314ba5b70b64ce53de3_G26, SystemTracker:gateway2, Timestamp:2/28/2018 3:41:45 PM, Info: map[]}}
TX(Session): Detach{Handle: 0, Closed: true, Error: *Error(nil)}
RX(Session): Detach{Handle: 0, Closed: true, Error: *Error(nil)}
ERRO[0041] entity path goehtest-pjkavlk4cq/ConsumerGroups/$Default/Partitions/0 and epoch 1 error: link detached, reason: *Error{Condition: amqp:link:stolen, Description: A receiver with a higher epoch '4' already exists. A new receiver with epoch 1 cannot be created. Make sure you are creating receiver with increasing epoch value to ensure connectivity, or ensure all old epoch receivers are closed or disconnected. TrackingId:8a804a40-52f5-48b3-ada9-08eb219f01b8_B25, SystemTracker:ehdjtest:eventhub:goehtest-pjkavlk4cq~8191, Timestamp:2/28/2018 3:41:45 PM Reference:5569318e-5e9b-441a-b780-88b73dcc55b0, TrackingId:2078bf14-76e7-437c-a7fa-4e15e478e0cd_B25, SystemTracker:ehdjtest:eventhub:goehtest-pjkavlk4cq~8191|$default, Timestamp:2/28/2018 3:41:45 PM TrackingId:a23d3b1f30694314ba5b70b64ce53de3_G26, SystemTracker:gateway2, Timestamp:2/28/2018 3:41:45 PM, Info: map[]}

RX(Session): Detach{Handle: 0, Closed: true, Error: *Error{Condition: amqp:link:detach-forced, Description: The link 'G26:29035311:opkxrbrNvsKWErgDdrEDiUshYNcwdYkxOzzsFHTg' is force detached by the broker due to errors occurred in publisher(link5075554). Detach origin: Message sender was closed because entity has been deleted.. TrackingId:bf8e03460000a344004d72625a96cdba_G26_B25, SystemTracker:ehdjtest:eventhub:goehtest-pjkavlk4cq~8191, Timestamp:2/28/2018 3:41:51 PM, Info: map[]}}
RX: Detach{Handle: 0, Closed: true, Error: *Error{Condition: amqp:link:detach-forced, Description: The link 'G26:29035311:opkxrbrNvsKWErgDdrEDiUshYNcwdYkxOzzsFHTg' is force detached by the broker due to errors occurred in publisher(link5075554). Detach origin: Message sender was closed because entity has been deleted.. TrackingId:bf8e03460000a344004d72625a96cdba_G26_B25, SystemTracker:ehdjtest:eventhub:goehtest-pjkavlk4cq~8191, Timestamp:2/28/2018 3:41:51 PM, Info: map[]}}
TX(Session): Detach{Handle: 0, Closed: true, Error: *Error(nil)}

Connection Open Properties

On the open performative, field properties provides a nice way to add user-agent and other helpful information. Unfortunately, there is no way to get at this internal map at this point.

  1. Would you like to expose configuring this map?
  2. If so, would the best way to handle this be a new ConnOption?

Send dropping messages over 438 characters

I have an application that uses the Send method to send a json encoded byte array and it seems to be dropping messages when the message data is over 438 bytes. Is there some flag I need to set for longer strings?

I'm using Azure Event Hubs (which shows no messages received when I only send messages over 438 bytes)

serializedEvent, err := eh.codec.Encode(eh.index, &event.Content)
  if err != nil {
    if !event.Guaranteed() {
      return false
    }

    logp.Critical("Unable to encode event: %v", err)
    return false
  }

  session, err := eh.client.NewSession()
  if err != nil {
    logp.Critical("Creating AMQP session: %v", err)
    return false
  }

  sender, err := session.NewSender(amqp.LinkTargetAddress("/" + eh.queue))
  if err != nil {
    logp.Critical("Creating sender link: %v", err)
    return false
  }

  //message := amqp.NewMessage(serializedEvent)
  message := &amqp.Message{
    Data: [][]byte{serializedEvent[:438], serializedEvent[438:]},
  }
  logp.Info("Sending message: %v", message)
  err = sender.Send(context.Background(), message)
  defer sender.Close()
  if err != nil {
    logp.Critical("Sending message: %v", err)
    return false
  }
  return true

Properly handle max sessions/wrapping

amqp/client.go

Lines 462 to 468 in ee6eb7e

// handle allocation request
case l := <-s.allocateHandle:
// TODO: handle max session/wrapping
l.handle = nextHandle // allocate handle to the link
linksByName[l.name] = l // add to mapping
nextHandle++ // increment the next handle
l.rx <- nil // send nil on channel to indicate allocation complete

Accurately determine transfer header size

Current implementation wastes several bytes on each transfer.

amqp/client.go

Lines 1545 to 1575 in ee6eb7e

const maxTransferFrameHeader = 66 // determined by calcMaxTransferFrameHeader
func calcMaxTransferFrameHeader() int {
var buf buffer
maxUint32 := uint32(math.MaxUint32)
receiverSettleMode := ReceiverSettleMode(0)
err := writeFrame(&buf, frame{
type_: frameTypeAMQP,
channel: math.MaxUint16,
body: &performTransfer{
Handle: maxUint32,
DeliveryID: &maxUint32,
DeliveryTag: bytes.Repeat([]byte{'a'}, 32),
MessageFormat: &maxUint32,
Settled: true,
More: true,
ReceiverSettleMode: &receiverSettleMode,
State: nil, // TODO: determine whether state should be included in size
Resume: true,
Aborted: true,
Batchable: true,
// Payload omitted as it is appended directly without any header
},
})
if err != nil {
panic(err)
}
return buf.len()
}

Add ability to configure receiver's target and sender's source

It's needed at least to implement request-response operations (reply-to):
https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-request-response

Right now it's not possible:

amqp/client.go

Lines 692 to 705 in 746c1d6

if isReceiver {
attach.Role = roleReceiver
attach.Source = &source{
Address: l.address,
Dynamic: l.dynamicAddr,
Filter: l.filters,
}
} else {
attach.Role = roleSender
attach.Target = &target{
Address: l.address,
Dynamic: l.dynamicAddr,
}
}

Probably we should add funcs like LinkSource(s *Source) and LinkTarget(t *Target)
Any thoughts?

Prevent sending multiple delivery states for the same message.

Currently it's possible to call accept/modify/reject/release multiple times on the same message and have a disposition sent multiple times. These are all terminal delivery states and only the first call is valid.

  • Ensure that only the first call sends a disposition.
  • Document the behavior.
  • Possibly change the signatures to return errors so that this can be reported to the user.

Evaluate removing reader/writer interfaces

I think *bytes.Buffer is the only type currently passed. Changing from an interface gives up some flexibility, but it allows for better inlining and escape analysis decisions. Need to benchmark to see whether the change is justified.

Pick a more reasonable max message size

amqp/client.go

Lines 29 to 30 in ee6eb7e

// maxSliceLen is equal to math.MaxInt32 or math.MaxInt64, depending on platform
const maxSliceLen = uint64(^uint(0) >> 1)

amqp/client.go

Lines 791 to 794 in ee6eb7e

// TODO: this is excessive, especially on 64-bit platforms
// default to a more reasonable max and allow users to
// change via LinkOption
l.peerMaxMessageSize = maxSliceLen

amqp/client.go

Lines 834 to 837 in ee6eb7e

// TODO: this is excessive, especially on 64-bit platforms
// default to a more reasonable max and allow users to
// change via LinkOption
maxMessageSize: maxSliceLen,

zero idle time out not work as expected

conn.go line 434 _ = c.net.SetReadDeadline(time.Now().Add(c.idleTimeout)) ,

should change it to

if c.idleTimeout > 0 { _ = c.net.SetReadDeadline(time.Now().Add(c.idleTimeout)) }

Support for sequences

Unsure how widely used sequences are. Low on the priority list unless someone asks for them.

amqp/types.go

Lines 1710 to 1712 in ee6eb7e

// TODO: this could be data(s), amqp-sequence(s), amqp-value rather than single data:
// "The body consists of one of the following three choices: one or more data
// sections, one or more amqp-sequence sections, or a single amqp-value section."

Implement decimal and char types

Go has no builtin decimal types, would need to determine a reasonable way to represent these without pulling in a 3rd party lib.

Char is UTF-32BE encoded. Ideally this would be converted into a rune.

amqp/decode.go

Lines 658 to 666 in ee6eb7e

// TODO: implement
case typeCodeDecimal32:
return nil, errorNew("decimal32 not implemented")
case typeCodeDecimal64:
return nil, errorNew("decimal64 not implemented")
case typeCodeDecimal128:
return nil, errorNew("decimal128 not implemented")
case typeCodeChar:
return nil, errorNew("char not implemented")

Do not work with RabbitMQ

Hello,

I try to get this package work with RabbitMQ and the AMQP 1 plugin. I can get it work with Go and Azure or C# and RabbitMQ but not with Go and RabbitMQ. Here is my code

package main

import (
	"context"
	"time"

	log "github.com/sirupsen/logrus"
	"pack.ag/amqp"
)

const amqpURL = "amqp://xxxxx:xxxxx@localhost:5672/"

func main() {

	// Create client
	client, err := amqp.Dial(amqpURL)

	ctx := context.Background()

	if err != nil {
		log.Fatal(err)
	}

	// Open a session
	session, err := client.NewSession()
	if err != nil {
		log.Fatal("Creating AMQP session:", err)
	}

	// Create a sender
	sender, err := session.NewSender(
		amqp.LinkTargetAddress("/queue-name"),
	)

	if err != nil {
		log.Fatal("Creating sender link:", err)
	}

	ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

	// Send message
	msg := "Hello!"
	err = sender.Send(ctx, amqp.NewMessage([]byte(msg)))
	if err != nil {
		log.Fatal("Sending message:", err)
	}

	log.Info("sent %s", msg)

	cancel()
	sender.Close()
}

The code is blocked at session.NewSender and moreover I can't see the connection in RabbitMQ admin.

Did I miss something ?

Best regard

Claims-based Security Support

AMQP Claims-based Security: Defines a layered protocol to authenticate AMQP clients using security tokens (working draft).

This would enable a consumer of the library to provide a set of claims for fine grained access control, a token with a TTL and a token type to negotiate a connection via SASL.

This is based on the AMQP management specification, also in working draft.

One example implementor is Azure Service Bus using either SAS or JWT tokens. A good example of this is AMQPNetLite CBS example.

Send fails when sending large messages

When sending to Event Hubs a tt.data consisting of the following.

{
	label: "1 roundtrip, large payload",
	data: []string{strings.Repeat("H", 133793)},
},

sender.Send will timeout.

Debug test output is as follows:

=== RUN   TestIntegration_EventHubs_RoundTrip
=== RUN   TestIntegration_EventHubs_RoundTrip/1_roundtrip,_large_payload
TX: Begin{RemoteChannel: 0, NextOutgoingID: 0, IncomingWindow: 5000, OutgoingWindow: 4294967295, HandleMax: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
RX: Begin{RemoteChannel: 0, NextOutgoingID: 1, IncomingWindow: 5000, OutgoingWindow: 5000, HandleMax: 255, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
TX: Attach{Name: jdZNvYUpEdoRxgCkhrpuvhvFmEckUNzOGxwWFUMj, Handle: 0, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: integration-receive-10546816273958410203/ConsumerGroups/$default/Partitions/0, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset > '@latest'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 9223372036854775807, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
RX(Session): Attach{Name: jdZNvYUpEdoRxgCkhrpuvhvFmEckUNzOGxwWFUMj, Handle: 0, Role: Sender, SenderSettleMode: settled, ReceiverSettleMode: <nil>, Source: source{Address: integration-receive-10546816273958410203/ConsumerGroups/$default/Partitions/0, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset > '@latest'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 266240, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
RX: Attach{Name: jdZNvYUpEdoRxgCkhrpuvhvFmEckUNzOGxwWFUMj, Handle: 0, Role: Sender, SenderSettleMode: settled, ReceiverSettleMode: <nil>, Source: source{Address: integration-receive-10546816273958410203/ConsumerGroups/$default/Partitions/0, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset > '@latest'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 266240, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
TX: Attach{Name: jIjqTJyvSVpFHCuxjShrflViQiaPDXWHngvyyaUk, Handle: 1, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: source{Address: integration-receive-10546816273958410203/ConsumerGroups/$default/Partitions/1, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset > '@latest'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 9223372036854775807, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
TX: Flow{NextIncomingID: <nil>, IncomingWindow: 0, NextOutgoingID: 0, OutgoingWindow: 0, Handle: 0, DeliveryCount: 0, LinkCredit: 10, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
TX(Session): Flow{NextIncomingID: 1, IncomingWindow: 5000, NextOutgoingID: 0, OutgoingWindow: 4294967295, Handle: 0, DeliveryCount: 0, LinkCredit: 10, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
RX(Session): Attach{Name: jIjqTJyvSVpFHCuxjShrflViQiaPDXWHngvyyaUk, Handle: 1, Role: Sender, SenderSettleMode: settled, ReceiverSettleMode: <nil>, Source: source{Address: integration-receive-10546816273958410203/ConsumerGroups/$default/Partitions/1, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset > '@latest'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 266240, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
RX: Attach{Name: jIjqTJyvSVpFHCuxjShrflViQiaPDXWHngvyyaUk, Handle: 1, Role: Sender, SenderSettleMode: settled, ReceiverSettleMode: <nil>, Source: source{Address: integration-receive-10546816273958410203/ConsumerGroups/$default/Partitions/1, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[apache.org:selector-filter:string:describedType{descriptor: 77567109365764, value: amqp.annotation.x-opt-offset > '@latest'}], DefaultOutcome: <nil>Outcomes: [], Capabilities: []}, Target: <nil>, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 266240, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
TX: Attach{Name: KeVnbUGZSTHtZfxPrtGMUaCzXFAdqOAYgmXfsdBa, Handle: 2, Role: Sender, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: <nil>, Target: source{Address: integration-receive-10546816273958410203, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 9223372036854775807, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
TX: Flow{NextIncomingID: <nil>, IncomingWindow: 0, NextOutgoingID: 0, OutgoingWindow: 0, Handle: 1, DeliveryCount: 0, LinkCredit: 10, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
TX(Session): Flow{NextIncomingID: 1, IncomingWindow: 5000, NextOutgoingID: 0, OutgoingWindow: 4294967295, Handle: 1, DeliveryCount: 0, LinkCredit: 10, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
RX(Session): Attach{Name: KeVnbUGZSTHtZfxPrtGMUaCzXFAdqOAYgmXfsdBa, Handle: 2, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: <nil>, Target: source{Address: integration-receive-10546816273958410203, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 262144, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[com.microsoft:tracking-id:d8cd0fd33424440498f2d30ac379211e_G5]}
RX: Attach{Name: KeVnbUGZSTHtZfxPrtGMUaCzXFAdqOAYgmXfsdBa, Handle: 2, Role: Receiver, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: <nil>, Target: source{Address: integration-receive-10546816273958410203, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 262144, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[com.microsoft:tracking-id:d8cd0fd33424440498f2d30ac379211e_G5]}
RX(Session): Flow{NextIncomingID: 0, IncomingWindow: 5000, NextOutgoingID: 1, OutgoingWindow: 5000, Handle: 2, DeliveryCount: 0, LinkCredit: 300, Available: 0, Drain: false, Echo: false, Properties: map[]}
RX: Flow{NextIncomingID: 0, IncomingWindow: 5000, NextOutgoingID: 1, OutgoingWindow: 5000, Handle: 2, DeliveryCount: 0, LinkCredit: 300, Available: 0, Drain: false, Echo: false, Properties: map[]}
TX(link): Transfer{Handle: 2, DeliveryID: 1, DeliveryTag: "\x00\x00\x00\x00\x00\x00\x00\x00", MessageFormat: 0, Settled: false, More: true, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 446}
TX(link): Transfer{Handle: 2, DeliveryID: <nil>, DeliveryTag: "<nil>", MessageFormat: <nil>, Settled: false, More: true, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 446}
TX(Session): Transfer{Handle: 2, DeliveryID: 1, DeliveryTag: "\x00\x00\x00\x00\x00\x00\x00\x00", MessageFormat: 0, Settled: false, More: true, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 446}
TX(Session): Transfer{Handle: 2, DeliveryID: <nil>, DeliveryTag: "<nil>", MessageFormat: <nil>, Settled: false, More: true, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 446}

...
TX(Session): Transfer{Handle: 2, DeliveryID: <nil>, DeliveryTag: "<nil>", MessageFormat: <nil>, Settled: false, More: true, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 446}
RX(Session): Disposition{Role: Receiver, First: 1, Last: <nil>, Settled: true, State: Rejected{Error: *Error{Condition: amqp:link:detach-forced, Description: The link 'G5:Send:67112:636576975061600000:F0:C41795' is force detached by the broker due to errors occurred in publisher(link24215811). Detach origin: Message sender was closed because entity has been deleted.. TrackingId:c8bb45460000a343017181035ab969fa_G5_B29, SystemTracker:ehdjtest:eventhub:integration-receive-10546816273958410203~16383, Timestamp:3/26/2018 9:45:55 PM, Info: map[]}}, Batchable: false}
RX: Disposition{Role: Receiver, First: 1, Last: <nil>, Settled: true, State: Rejected{Error: *Error{Condition: amqp:link:detach-forced, Description: The link 'G5:Send:67112:636576975061600000:F0:C41795' is force detached by the broker due to errors occurred in publisher(link24215811). Detach origin: Message sender was closed because entity has been deleted.. TrackingId:c8bb45460000a343017181035ab969fa_G5_B29, SystemTracker:ehdjtest:eventhub:integration-receive-10546816273958410203~16383, Timestamp:3/26/2018 9:45:55 PM, Info: map[]}}, Batchable: false}
TX(Session): Detach{Handle: 2, Closed: true, Error: *Error(nil)}
RX(Session): Detach{Handle: 0, Closed: true, Error: *Error{Condition: amqp:link:detach-forced, Description: The link 'G5:54217517:jdZNvYUpEdoRxgCkhrpuvhvFmEckUNzOGxwWFUMj' is force detached by the broker due to errors occurred in consumer(link24215784). Detach origin: InnerMessageReceiver was closed. TrackingId:c8bb45460000a343017180e85ab969fa_G5_B29, SystemTracker:ehdjtest:eventhub:integration-receive-10546816273958410203~16383|$default, Timestamp:3/26/2018 9:45:55 PM, Info: map[]}}
RX(Session): Detach{Handle: 1, Closed: true, Error: *Error{Condition: amqp:link:detach-forced, Description: The link 'G5:54217542:jIjqTJyvSVpFHCuxjShrflViQiaPDXWHngvyyaUk' is force detached by the broker due to errors occurred in consumer(link21953275). Detach origin: InnerMessageReceiver was closed. TrackingId:174685f60000a344014efafb5ab969fa_G5_B8, SystemTracker:ehdjtest:eventhub:integration-receive-10546816273958410203~32766|$default, Timestamp:3/26/2018 9:45:55 PM, Info: map[]}}
RX: Detach{Handle: 0, Closed: true, Error: *Error{Condition: amqp:link:detach-forced, Description: The link 'G5:54217517:jdZNvYUpEdoRxgCkhrpuvhvFmEckUNzOGxwWFUMj' is force detached by the broker due to errors occurred in consumer(link24215784). Detach origin: InnerMessageReceiver was closed. TrackingId:c8bb45460000a343017180e85ab969fa_G5_B29, SystemTracker:ehdjtest:eventhub:integration-receive-10546816273958410203~16383|$default, Timestamp:3/26/2018 9:45:55 PM, Info: map[]}}
RX: Detach{Handle: 1, Closed: true, Error: *Error{Condition: amqp:link:detach-forced, Description: The link 'G5:54217542:jIjqTJyvSVpFHCuxjShrflViQiaPDXWHngvyyaUk' is force detached by the broker due to errors occurred in consumer(link21953275). Detach origin: InnerMessageReceiver was closed. TrackingId:174685f60000a344014efafb5ab969fa_G5_B8, SystemTracker:ehdjtest:eventhub:integration-receive-10546816273958410203~32766|$default, Timestamp:3/26/2018 9:45:55 PM, Info: map[]}}
TX(Session): Detach{Handle: 0, Closed: true, Error: *Error(nil)}
TX(Session): Detach{Handle: 1, Closed: true, Error: *Error(nil)}
RX(Session): Detach{Handle: 2, Closed: true, Error: *Error(nil)}
--- FAIL: TestIntegration_EventHubs_RoundTrip (52.68s)
	integration_test.go:764: Creating hub integration-receive-10546816273958410203
    --- FAIL: TestIntegration_EventHubs_RoundTrip/1_roundtrip,_large_payload (6.30s)
    	integration_test.go:453: Error after 0 sends: context deadline exceeded
FAIL
exit status 1
FAIL	pack.ag/amqp	52.711s

Multiple consumers where "some of them are same"

I have a service which has two instances v1 and v2. Now since it's the same service with two different instances, what I would like to achieve is that both load balance on the queue they are subscribed to.

For e.g. if the queue has messages 1, 2, 3, 4, 5, ...

the queues should get following in the order of

v1 -> 1, 3, 5, ...
v2 -> 2, 4, ...

Now if the v2 instance goes down and isn't available any more then v1 should start getting the following messages.

if the state of queue is ..., 12, 13, 14, 15, 16, ... when message 14 arrives on queue and v2 has failed.

v1 -> .., 13, 14, 15, 16, ...
v2 -> 12, xxx

How do I achieve this with this library? So from the documentation what I have learnt is that you can achieve this with something called message grouping.

I have tried multiple things like amqp.LinkSessionFilter, amqp.LinkProperty, amqp.ConnProperty, etc.

What is the right way to achieve this, any help and pointers will be helpful.

Support for multiple data sections

Per discussion in #35, multiple data section support would be helpful for some use cases.

The implementation shouldn't be too difficult, but I'm not sure how the API should look.

Though not a firm requirement, I'd like to leave Message.Data as is. I suspect a single data payload is the most common case.


Idea A: Add an AdditionalData [][]byte field.

  • This would require the consumer to process data from Message.Data and Message.AdditionalData, which isn't particularly ergonomic but I don't think it's terrible either.

Idea B: Add MultiData [][]byte, where the first element is the same same as Message.Data.

  • I see this is a bit more appealing to a receiver as they would only have to deal with Message.Data or Message.MultiData instead of both.
  • It's not clear how a send operation would deal with Message.Data and Message.MultiData where the first MultiData element differs from Data. The API would likely end up asymmetric.

Idea C: A variation on the above MultiData [][]byte approach could be to only set Data or MultiData.

  • Attempting to send with both fields set would result in an error.

After rubber-ducking in this issue, I'm favoring Idea C. Other input is welcome.

/cc @devigned

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.