Git Product home page Git Product logo

go-amqp's People

Contributors

adalstes avatar alanconway avatar amenzhinsky avatar bmnielsen avatar carlosmaersk avatar devigned avatar gtully avatar hanvanderveen avatar holykau avatar jhendrixmsft avatar jjcollinge avatar k-wall avatar lawrencegripper avatar marstr avatar mcardy avatar microsoftopensource avatar mikeharder avatar octomad avatar philschleier avatar princjef avatar richardpark-msft avatar rickwinter avatar serbrech avatar spacepille avatar stefan-kolb avatar tux-mind avatar tymonx avatar vcabbage avatar zedar avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-amqp's Issues

Don't disconnect the link on message disposition errors

Test against Service Bus and Event Hubs

  • Don't detach on disposition errors (enabled with flag)
  • Test against Event Hubs with throttling (tested with throttling.go, increased to 100 parallel batches, 10 total retries)
  • Test against Service Bus with something like message lock expired. Service Bus avoids this behavior entirely because of it's receive mode.

Idle timeout implementation too strict?

Looking at the AMQP 1.0 spec, section 2.4.5 Idle Timeout of a Connection, it has the following text:

To avoid spurious timeouts, the value in idle-time-out SHOULD be half the peer’s
actual timeout threshold.

However this library seems to implement the opposite -- it considers the value of the idle timeout as a hard limit and heartbeats at 2x the rate. See

https://github.com/Azure/go-amqp/blob/master/conn.go#L465
https://github.com/Azure/go-amqp/blob/master/conn.go#L579

I'm seeing IBM MQ 9.2.0's AMQP implementation heartbeat at precisely the given idle timeout period, which can cause spurious disconnections. (It also has a AMQPKA setting whose value is halved before being provided in the server's idle timeout value.)

I'm also a newbie as far as AMQP 1.0 is concerned, so perhaps this library is right and IBM's implementation is wrong.

Using this package to connect to QPID

Hello,

I am trying to use this package to connect to a QPID AMQP broker (it implements AMQP 1.0) and to send some messages. I am using the sample code that can be found on the godoc page, but I get the following error after connection:

2021/05/07 17:40:17 Creating sender link:*Error{Condition: amqp:invalid-field, Description: received Attach with remote null terminus., Info: map[]}
exit status 1

Here is my code:

package main

import (
	"context"
	"fmt"
	"github.com/Azure/go-amqp"
	"log"
	"time"
)

func main() {

	// Create client
	client, err := amqp.Dial("amqp://sptest.aaaa.bb",
		amqp.ConnSASLPlain("abababa", "cdcdcdc"),
	)
	fmt.Println("dial ok")
	if err != nil {
		log.Fatal("Dialing AMQP server:", err)
	}
	defer client.Close()

	// Open a session
	session, err := client.NewSession()
	if err != nil {
		log.Fatal("Creating AMQP session:", err)
	}
	fmt.Println("session opened")

	ctx := context.Background()

	// Send a message
	{
		// Create a sender
		sender, err := session.NewSender(
			amqp.LinkTargetAddress("/myTestQueue"),
//			amqp.LinkTargetTimeout(10),
//			amqp.LinkTargetDurability(2),
		)
		if err != nil {
			log.Fatal("Creating sender link:", err)
		}
		fmt.Println("Sender link created")

		ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

		// Send message
		err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")))
		if err != nil {
			log.Fatal("Sending message:", err)
		}

		sender.Close(ctx)
		cancel()
	}
}

I have tried many different ways to enter the queue name, with slash, without it, with TargetDurability, etc,. but no luck. Dialing is OK, session is opened, but then it exits with error.

From the same machine, using the same connection credentials, I can easily connect and send using node.js with rhea. This means that the problem must be in my code. I am sure I am missing some obvious thing, probably on how to reference the queue.

Any help is appreciated.

How to make messages persistent?

Hi! I might have overlooked something, but I'm unable to make new queues/messages persistent. I'm coming from Java where I've been able to create new queues/messages that default to persistent. I've found the 'durability configuration' link option on create sender/receiver, but that does not seem to make messages persistent.

How do I mark my messages as persistent?

Thanks in advance!

Could not decode AMQP Frame

My code

type activeMQManager struct {
	amqSender *amqp.Sender
}

func initActiveMq(ctx context.Context, logger *model.Logger) (*activeMQManager, error) {
	// Try to connect with AMQ
	client, err := amqp.Dial(logger.Conn)
	if err != nil {
		return nil, err
	}

	
	// Open a session
	amqSession, err := client.NewSession()
	if err != nil {
		return nil, err
	}

	// Create a sender
	sender, err := amqSession.NewSender(
		amqp.LinkTargetAddress(logger.Topic),
	)
	if err != nil {
		return nil, err
	}
	fmt.Println("Connected to active mq")

	return &activeMQManager{amqSender: sender}, nil
}

// LogRequest logs headers,URL,body of the request to AMQ
func (m *activeMQManager) LogRequest(ctx context.Context, r *http.Request) error {
	// Set a request id in header if it doesn't exist
	requestID := r.Header.Get(model.RequestIDHeader)
	if requestID == "" {
		// set a new request id header of request
		requestID = ksuid.New().String()
		r.Header.Add(model.RequestIDHeader, requestID)
	}

	// copy request body
	reqBody, _ := ioutil.ReadAll(r.Body)
	r.Body = ioutil.NopCloser(bytes.NewBuffer(reqBody))
	urlParams, _ := url.ParseQuery(r.URL.RawQuery)
	payload := utils.M{
		"type":        "request",
		"requestId":   requestID,
		"headers":     r.Header,
		"ts":          time.Now().UTC(),
		"url":         r.URL.String(),
		"urlPath":     r.URL.Path,
		"urlParams":   urlParams,
		"urlFragment": r.URL.Fragment,
		"body":        string(reqBody),
	}
	return m.sendAMQMessage(ctx, payload)
}

// LogResponse logs headers,status of the response to AMQ
func (m *activeMQManager) LogResponse(ctx context.Context, requestID string, status int, headers http.Header) error {
	payload := utils.M{
		"type":      "response",
		"ts":        time.Now().UTC(),
		"requestId": requestID,
		"headers":   headers,
		"status":    status,
	}

	return m.sendAMQMessage(ctx, payload)
}

func (m *activeMQManager) sendAMQMessage(ctx context.Context, payload utils.M) error {
	data, err := json.Marshal(payload)
	if err != nil {
		return utils.NewError("Unable to marshal request payload", err)
	}

	// Send message
	return m.amqSender.Send(ctx, amqp.NewMessage(data))
}

Client Version: v0.13.6

Error photo
Unable to log response to activemq
image

Receiver.Receive() could use a flag to read only what's in the channel and return immediately otherwise

As part of the ReceiveMessages() implementation for Service Bus (batched receiving) there's a workflow similar to this:

  1. Issue credit (via Receiver.IssueCredit)
  2. Receive messages until a certain time period is reached
  3. Drain credit (via Receiver.DrainCredit)

While (3) is happening we can still be receiving messages. Drain will ask the service to basically flush whatever it's got on hand (which can also be nothing).

To get a clean boundary between one receiver to the next I need to make sure that I can properly drain the local receiver of any messages it might still have in l.messages. This is almost possible today because there's already a non-blocking read of the channel prior to the later blocking read of the channel link but I don't think I have a way of guaranteeing that I only read from the cached messages (ie - I'm not interested in blocking at all).

Audit all composite types to ensure they properly implement non-mandatory fields

Per http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#doc-idp115568

mandatory: "true" iff a non null value for the field is always encoded.

What this means is that for all non-mandatory fields in a composite type, they must be pointer-to-type. That said, we should be smart in how we fix things up. E.g. the description field in the error type isn't mandatory, however making it be *string doesn't really add any value.

Message Transfer with credit>1 will cause dead lock

i init receiver with session.NewReceiver using amqp.LinkCredit(10), and send message to another queue when received.
the send method will check if sender exist, if not, new a sender with session.NewSender......
that was bad, I cant get the sender for a long while and the message cannot be consumed....

so now, i have to make receiver using option amqp.LinkCredit(1) to resolve

Long messages and "Servicebus Lock Duration"

We are experiencing weird behaviour when messages take longer than half the lock-duration of the servicebus queue or topic.

Example:
Queue with a lock-duration of 30 seconds
All messages taking 20 seconds before Accept.

Expected behaviour:
All messages to be read and accepted by the servicebus

Actual behaviour:
1st message gets accepted correctly by the queue
all subsequent messaged are reported to the client as accepted, but the queue handles them like "Release()"
i.e no errors are reported from the "msg.Accept()"-call

The timing is always half of the "lock-duration", i.e setting lock-duration to max 5 minutes, messages taking >2.5 minutes behave the same as the scenario above.
and all messages taking less than half the lock-duration works as they should.

after some extended testing it seems that some message occasionally gets accepted correctly, but most messages end up as "released"

Can't set settlement mode to anything but default mixed mode

Setting settlement mode to ModeSettled error out.

Creating sender link:amqp: sender settlement mode "settled" requested, received "mixed" from server

sender, err := session.NewSender(
			amqp.LinkTargetAddress("/queue-name"),amqp.LinkSenderSettle(amqp.ModeSettled),
		)
		```

Creating receiver with settle mode second fails when connected to Artemis

Hi,

I am trying to create a new client which connects to Artemis using AMQP 1.0.

I want the client to explicitly call the 'AcceptMessage' before Artemis removes the message as documented when creating the receiver with settle mode 'second'.

docker-compose.yml

version: '3'

services:
  artemis:
    image: vromero/activemq-artemis
    environment:
      - ARTEMIS_USERNAME=artemis
      - ARTEMIS_PASSWORD=artemis
      - DISABLE_SECURITY=true
    ports:
      - "8161:8161"
      - "61616:61616"
      - "5672:5672"
    restart: always

main.go

package main

import (
	"context"
	"github.com/Azure/go-amqp"
	"github.com/sirupsen/logrus"
)

func main() {
	addr := "127.0.0.1"

	client, err := newClient(addr)
	if err != nil {
		logrus.Fatalf("failed to create client - %s", err.Error())
	}

	session, err := client.NewSession()
	if err != nil {
		logrus.Fatalf("failed to create session - %s", err.Error())
	}

	name := "my.topic"

	receiver, err := session.NewReceiver(
		amqp.LinkName("my-queue"),
		amqp.LinkSourceAddress(name),
		amqp.LinkCredit(1),
		amqp.LinkReceiverSettle(amqp.ModeSecond),
		amqp.LinkSourceCapabilities("shared", "global", "topic"),
		amqp.LinkSourceDurability(amqp.DurabilityUnsettledState),
		amqp.LinkSourceExpiryPolicy(amqp.ExpiryNever),
	)
	if err != nil {
		logrus.Fatalf("failed to create receiver - %s", err.Error())
	}

	logrus.Infof("now listening to events from %s", name)

	for {
		msg, err := receiver.Receive(context.Background())
		if err != nil {
			logrus.Fatalf("reading message from %s artemis failed", name)
		}

		logrus.Infof("received message from artemis on topic %s with payload %s", addr, msg.Value)

		// Implement handle & accept/reject message here
	}
}

func newClient(addr string) (*amqp.Client, error) {
	return amqp.Dial(addr, amqp.ConnContainerID("testing-for-github"))
}

This code results in the following output;

FATA[0000] failed to create receiver - amqp: receiver settlement mode "second" requested, received "first" from server 

It seems this is because of a new implementation feature that checks the settle mode received from the server located at "[email protected]/link.go:291".

I am not sure if this check is really required and why it needs to be checked?

To check if this is the only thing that blocks my purpose, I tried creating a fork that only has the check removed from the library. This resulted in the following output:

INFO[0000] now listening to events from my.topic        
INFO[0015] received message from artemis on topic 127.0.0.1 with payload test 

The message is correctly received and processed, but it is also directly confirmed, even when I have the receiver's settle mode set to 'second'.

Could you see what I am doing wrong here?

Hitting panic during attach link when resp.Source is nil

Currently have an issue with attach link throwing out a panic.

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x48 pc=0x846316]
goroutine 121 [running]:
[github.com/Azure/go-amqp.attachLink(0xc000546200](http://github.com/Azure/go-amqp.attachLink(0xc000546200), 0xc00044e9b0, {0xc00046e620, 0xc00046e530, 0x849a0a})
github.com/Azure/[email protected]/link.go:238 +0x6d6
[github.com/Azure/go-amqp.(*Session).NewReceiver(0xc38a64](http://github.com/Azure/go-amqp.(*Session).NewReceiver(0xc38a64), {0xc00046e620, 0x9, 0x9})
github.com/Azure/[email protected]/session.go:95 +0x6e

Line in question:

go-amqp/link.go

Line 238 in 01b4c64

l.Source.Filter = resp.Source.Filter

l.Source should not be the issue, as it gets initialized, in case of nil, so it has to be caused by resp.Source.

go-amqp/link.go

Lines 225 to 227 in d1ea582

if l.Source == nil {
l.Source = new(frames.Source)
}

Furthermore, there is already a check for that returns an error when both the resp.Source and resp.target are nil, but it seems we are hitting an issue where only the resp.Source is nil, as we don't get the detach error, but end up in this panic instead.

go-amqp/link.go

Line 193 in d1ea582

if resp.Source == nil && resp.Target == nil {

Also, when using dynamic address and assigning the respSource.Address, there is an explicit check for whether the resp.Source is nil.

go-amqp/link.go

Lines 229 to 231 in d1ea582

if l.dynamicAddr && resp.Source != nil {
l.Source.Address = resp.Source.Address
}

Should the l.Source.Filter = resp.Source.Filter assignment have a similar check as the address assignment above, or should it result in an error?

Validate Receive() prioritizes flushing internal messages channel before returning an error

Currently, if a caller is .Receive()'ing from a link and the link fails (detached/closed) it will return an error.

As part of this we want to make sure that this logical sequence of events is respected:

  • Some messages are received, and placed into the link's internal channel
  • A detach happens

So if we are calling .Receive() in a loop, for instance, we should receive all the messages cached in the internal channel first before we receive the detach error. This makes it simple for clients to know when they can properly abandon a link instance, leaving it with no vital state.

This might already be the state of affairs, just want to validate before a potential GA of this lib.

Validate if we need to auto-accept messages explicitly in modeFirst

Currently go-amqp calls .Accept() on messages received in ModeFirst.

From our reading of the spec it's possible we don't need to do that, which can be beneficial to callers because it removes a bit of I/O in that path (and removes a potential point of failure for no gain).

This is mostly just a "check that our behavior matches other reference implementations" issue.

Message remains unsettled

Hi,

we've been using this library for a while without any issues to connect to a cpp qpid Broker.

The changes introduced with v0.13.6 however break our client.

If I understand correctly, any incoming message is added to unsettledMessages via addUnsettled, if it has an empty DeliveryTag. It should get removed from there by a call to deleteUnsettled.
The only place where deleteUnsettled is called is within the trackCompletion handler (we're using HandleMessage). This however was recently changed to only be run with a receiverSettleMode of ModeSecond.

As can be seen from the following debug log, my link has receiverSettleMode "first", while my message does have a non-empty DeliveryTag "\x00\x00\x00\x00". If I understand correctly, my message gets added to the unsettledMessages map because it has a non-empty DeliveryTag, but because I'm in ModeSecond it never gets removed.

19:07:48.210557 FLOW Link Mux half: source: somewhere, inflight: 0, credit: 0, deliveryCount: 0, messages: 0, unsettled: 0, maxCredit : 1, settleMode: first
19:07:48.210628 TX(Session) - tx: Flow{NextIncomingID: 0, IncomingWindow: 100, NextOutgoingID: 0, OutgoingWindow: 100, Handle: 0, DeliveryCount: 0, LinkCredit: 1, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
19:07:51.064771 RX(Session): Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: "\x00\x00\x00\x00", MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 1746}
19:07:51.064870 deliveryID 0 before push to receiver - deliveryCount : 0 - linkCredit: 1, len(messages): 0, len(inflight): 0
19:07:51.064908 deliveryID 0 after push to receiver - deliveryCount : 0 - linkCredit: 1, len(messages): 0, len(inflight): 0
19:07:51.064928 deliveryID 0 before exit - deliveryCount : 1 - linkCredit: 0, len(messages): 0
19:07:51.064933 PAUSE Link Mux pause: inflight: 0, credit: 0, deliveryCount: 1, messages: 0, unsettled: 1, maxCredit : 1, settleMode: first
19:07:51.065318 TX: Disposition{Role: Receiver, First: 0, Last: <nil>, Settled: true, State: Accepted, Batchable: false}

I don't see any way to resolve this situation starting with v0.13.6. Any idea how this can be resolved?

If I'm reading the previous commits correctly, the empty DeliveryTag is used as a hint for settleMode == ModeFirst in addUnsettled. Couldn't this instead just use the same if r.link.receiverSettleMode.value() == ModeSecond as in HandleMessage to be consistent?

Any help would be greatly appreciated.

Receivers not getting messages from a full queue

Seems like we are running into a flow control issue with the library. We have been able to reason about what we think (we currently can't see a way this could happen) might solve the issue. We have also been able to produce the problem with the attached sample code below, in under just couple hundred lines or so.

We create a queue that we are simultaneously, producing messages and consuming message within the application. Through this process we are expecting the consumer to receive them as they are available on the queue to process and marked them as accepted.

We setup a session and pass that to the Producer and Consumer which in turn create their respective Sender and Receiver methods on that same session.

On the Consumer we set LinkCredit to 27 more on that later. We are running 8 consumers in this example. As you can see below.

image

What we are seeing it that the Consumers only manages to dequeue first 100 messages or so before the AMQP server stops providing any more messages for retrieval and we are stuck. Bear in mind we are able to continue sending on that same session.

We enabled debugging, and in the attached logs.txt you will see that

OutgoingWindow reporting is kind of weird and sometimes returns max values in the logs. Is this a known issue? Can you shed some light on this? I spent quite some time trying to track the back and forth to illuminate what is happening on the wire.

...
11:23:26.098572 TX: Begin{RemoteChannel: <nil>, NextOutgoingID: 0, IncomingWindow: 100, OutgoingWindow: 100, HandleMax: 4294967295, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
11:23:26.098954 RX: Begin{RemoteChannel: 0, NextOutgoingID: 1, IncomingWindow: 16383, OutgoingWindow: 2147483647, HandleMax: 65535, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
11:23:26.099082 TX: Attach{Name: nSbobxLLsp2GbySYOLWUFYDnf17WzICmn3gc3r5pmf9Pfuq46fbv7g, Handle: 0, Role: Sender, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: <nil>, Target: source{Address: /queue-name, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
....

Now back to the numbers, this is a working theory we have that the SessionIncomingWindow which default 100 is used to control flow at the session level and the LinkCredits are being used at the link level. This is the part we are unsure about the relationship between the SessionIncomingWindow and LinkCredits.

So in the example case here we have 8 consumers with 27 link credits each which is 216. Which is little over the double of the SessionIncomingWindow which defaults to 100 Since they are all sharing the same session, we assume this is what is tripping the control flow and causing the error we are seeing.

If we lower just the link credits per consumer down to 24 which results to 192 we never see the lock up and all the messages a processed smoothly. We were able to verify this with various combinations as well.

image

We are looking better to understand this relationship since it doesn't make sense currently. We are aware that these limits are being calculated on the fly based on other values until server and client can sync them. Could there be a race condition which ends up locking up the processing?

I hope this is adequate information to help us debug the issue we are facing. Let me know if you need anymore information.

amqp.zip

Add in type annotations to all fields

As part of #93 we added in some type aliases for some of the AMQP types. These are primarily for readability on our end to ensure that we've properly matched the type with the AMQP spec.

As these are cosmetic we haven't rigorously gone and applied them to all fields, but it'd be useful to do so.

TestIntegrationReceiverModeSecond uncovers races

==================
WARNING: DATA RACE
Write at 0x00c000002458 by goroutine 20:
  github.com/Azure/go-amqp.(*link).muxReceive()
      C:/git/Azure/go-amqp/link.go:542 +0x1334
  github.com/Azure/go-amqp.(*link).muxHandleFrame()
      C:/git/Azure/go-amqp/link.go:567 +0x384
  github.com/Azure/go-amqp.(*link).mux()
      C:/git/Azure/go-amqp/link.go:321 +0xab6
  github.com/Azure/go-amqp.attachLink┬╖dwrap┬╖12()
      C:/git/Azure/go-amqp/link.go:237 +0x39

Previous read at 0x00c000002458 by goroutine 16:
  github.com/Azure/go-amqp.(*Session).mux()
      C:/git/Azure/go-amqp/session.go:362 +0xd84
  github.com/Azure/go-amqp.(*Client).NewSession┬╖dwrap┬╖1()
      C:/git/Azure/go-amqp/client.go:155 +0x47

Goroutine 20 (running) created at:
  github.com/Azure/go-amqp.attachLink()
      C:/git/Azure/go-amqp/link.go:237 +0x174c
  github.com/Azure/go-amqp.(*Session).NewReceiver()
      C:/git/Azure/go-amqp/session.go:90 +0x11e
  github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2()
      C:/git/Azure/go-amqp/integration_test.go:371 +0x164

Goroutine 16 (running) created at:
  github.com/Azure/go-amqp.(*Client).NewSession()
      C:/git/Azure/go-amqp/client.go:155 +0x6f7
  github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1()
      C:/git/Azure/go-amqp/integration_test.go:330 +0x1f5
  testing.tRunner()
      C:/Program Files/Go/src/testing/testing.go:1259 +0x22f
  testing.(*T).Run┬╖dwrap┬╖21()
      C:/Program Files/Go/src/testing/testing.go:1306 +0x47
==================
==================
WARNING: DATA RACE
Write at 0x00c0000560f0 by goroutine 19:
  github.com/Azure/go-amqp.(*inFlight).add()
      C:/git/Azure/go-amqp/receiver.go:296 +0x135
  github.com/Azure/go-amqp.(*Receiver).messageDisposition()
      C:/git/Azure/go-amqp/receiver.go:259 +0x124
  github.com/Azure/go-amqp.(*Message).Accept()
      C:/git/Azure/go-amqp/types.go:1815 +0x137
  github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2.1()
      C:/git/Azure/go-amqp/integration_test.go:385 +0x8b
  github.com/Azure/go-amqp.(*Receiver).HandleMessage.func2()
      C:/git/Azure/go-amqp/receiver.go:57 +0x1b7
  github.com/Azure/go-amqp.(*Receiver).HandleMessage()
      C:/git/Azure/go-amqp/receiver.go:75 +0x5ad
  github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2()
      C:/git/Azure/go-amqp/integration_test.go:383 +0x3d2

Previous read at 0x00c0000560f0 by goroutine 20:
  github.com/Azure/go-amqp.(*link).mux()
      C:/git/Azure/go-amqp/link.go:314 +0x224
  github.com/Azure/go-amqp.attachLink┬╖dwrap┬╖12()
      C:/git/Azure/go-amqp/link.go:237 +0x39

Goroutine 19 (running) created at:
  github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1()
      C:/git/Azure/go-amqp/integration_test.go:367 +0xa64
  testing.tRunner()
      C:/Program Files/Go/src/testing/testing.go:1259 +0x22f
  testing.(*T).Run┬╖dwrap┬╖21()
      C:/Program Files/Go/src/testing/testing.go:1306 +0x47

Goroutine 20 (running) created at:
  github.com/Azure/go-amqp.attachLink()
      C:/git/Azure/go-amqp/link.go:237 +0x174c
  github.com/Azure/go-amqp.(*Session).NewReceiver()
      C:/git/Azure/go-amqp/session.go:90 +0x11e
  github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2()
      C:/git/Azure/go-amqp/integration_test.go:371 +0x164
==================
==================
WARNING: DATA RACE
Read at 0x00c0001173f0 by goroutine 19:
  github.com/Azure/go-amqp.(*Message).done()
      C:/git/Azure/go-amqp/types.go:1786 +0x3c
  github.com/Azure/go-amqp.(*Message).Accept┬╖dwrap┬╖20()
      C:/git/Azure/go-amqp/types.go:1814 +0x39
  github.com/Azure/go-amqp.(*Message).Accept()
      C:/git/Azure/go-amqp/types.go:1815 +0x148
  github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2.1()
      C:/git/Azure/go-amqp/integration_test.go:385 +0x8b
  github.com/Azure/go-amqp.(*Receiver).HandleMessage.func2()
      C:/git/Azure/go-amqp/receiver.go:57 +0x1b7
  github.com/Azure/go-amqp.(*Receiver).HandleMessage()
      C:/git/Azure/go-amqp/receiver.go:75 +0x5ad
  github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2()
      C:/git/Azure/go-amqp/integration_test.go:383 +0x3d2

Previous write at 0x00c0001173f0 by goroutine 21:
  github.com/Azure/go-amqp.(*Receiver).HandleMessage.func1()
      C:/git/Azure/go-amqp/receiver.go:35 +0x73
  github.com/Azure/go-amqp.(*Receiver).HandleMessage.func2┬╖dwrap┬╖14()
      C:/git/Azure/go-amqp/receiver.go:54 +0x47

Goroutine 19 (running) created at:
  github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1()
      C:/git/Azure/go-amqp/integration_test.go:367 +0xa64
  testing.tRunner()
      C:/Program Files/Go/src/testing/testing.go:1259 +0x22f
  testing.(*T).Run┬╖dwrap┬╖21()
      C:/Program Files/Go/src/testing/testing.go:1306 +0x47

Goroutine 21 (running) created at:
  github.com/Azure/go-amqp.(*Receiver).HandleMessage.func2()
      C:/git/Azure/go-amqp/receiver.go:54 +0x1a5
  github.com/Azure/go-amqp.(*Receiver).HandleMessage()
      C:/git/Azure/go-amqp/receiver.go:75 +0x5ad
  github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2()
      C:/git/Azure/go-amqp/integration_test.go:383 +0x3d2
==================
==================
WARNING: DATA RACE
Write at 0x00c00013b650 by goroutine 19:
  runtime.mapassign_fast32()
      C:/Program Files/Go/src/runtime/map_fast32.go:92 +0x0
  github.com/Azure/go-amqp.(*inFlight).add()
      C:/git/Azure/go-amqp/receiver.go:298 +0x93
  github.com/Azure/go-amqp.(*Receiver).messageDisposition()
      C:/git/Azure/go-amqp/receiver.go:259 +0x124
  github.com/Azure/go-amqp.(*Message).Accept()
      C:/git/Azure/go-amqp/types.go:1815 +0x137
  github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2.1()
      C:/git/Azure/go-amqp/integration_test.go:385 +0x8b
  github.com/Azure/go-amqp.(*Receiver).HandleMessage.func2()
      C:/git/Azure/go-amqp/receiver.go:57 +0x1b7
  github.com/Azure/go-amqp.(*Receiver).HandleMessage()
      C:/git/Azure/go-amqp/receiver.go:75 +0x5ad
  github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2()
      C:/git/Azure/go-amqp/integration_test.go:383 +0x3d2

Previous read at 0x00c00013b650 by goroutine 20:
  github.com/Azure/go-amqp.(*link).mux()
      C:/git/Azure/go-amqp/link.go:314 +0x244
  github.com/Azure/go-amqp.attachLink┬╖dwrap┬╖12()
      C:/git/Azure/go-amqp/link.go:237 +0x39

Goroutine 19 (running) created at:
  github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1()
      C:/git/Azure/go-amqp/integration_test.go:367 +0xa64
  testing.tRunner()
      C:/Program Files/Go/src/testing/testing.go:1259 +0x22f
  testing.(*T).Run┬╖dwrap┬╖21()
      C:/Program Files/Go/src/testing/testing.go:1306 +0x47

Goroutine 20 (running) created at:
  github.com/Azure/go-amqp.attachLink()
      C:/git/Azure/go-amqp/link.go:237 +0x174c
  github.com/Azure/go-amqp.(*Session).NewReceiver()
      C:/git/Azure/go-amqp/session.go:90 +0x11e
  github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2()
      C:/git/Azure/go-amqp/integration_test.go:371 +0x164
==================

Session mux unwind can deadlock with conn mux

When the session mux unwinds, it tells the conn mux to delete the session.

select {
case s.conn.DelSession <- s:
case <-s.conn.Done:
	s.err = s.conn.Err()
}

This can deadlock with conn mux if it's trying to mux a frame to the same session.

// taken from conn.mux()
select {
case session.rx <- fr:
case <-c.closeMux:
	return
}

Once a session (or any) mux has exited its "channels pump", we shouldn't attempt to write any data to its channels.

Add SEQUENCE section encoding to amqp.Message

There are three sections you can use to encode data into an AMQP message:

  • Value (typed as interface{})
  • Data (typed as [][]byte)
  • Sequence (squashed into Data)

Sequence can actually be a sequence of several types of things, not just []bytes, and (like .Value) should be it's own separate field, with it's own distinct type.

My proposal is to create a .Sequence property and type it as []interface{}.

(The 'squashed' part is based on this comment in the code:)

// in Message

Data [][]byte
// A data section contains opaque binary data.
// TODO: this could be data(s), amqp-sequence(s), amqp-value rather than single data:
// "The body consists of one of the following three choices: one or more data
//  sections, one or more amqp-sequence sections, or a single amqp-value section."

Enable logging to be pluggable

Today, the only method for enabling go-amqp logging is to recompile your binary with a debug tag. However, this output is just goes to stdout/stderr and isn't capturable by the caller.

This means that libraries like azservicebus, which have their own pluggable logging systems (via azcore/log), aren't able to redirect those log messages to the output source the customer chose. It also means we can't automatically activate the logging, at runtime since it's compile-time only.

As part of this it's a good chance to look at our logging and see which parts might be optional - errors that do bubble up to the customer with enough tracing information don't need to be logged. However, FLOW frames, which "flow" in the background, or when channels being blocked when writing/reading (indicating potential performance issues) will be, as these are typically not visible.

servicebus.Queue#ReceiveOne causes data races

It occurs in v0.13.3 or later.
I thought this might be a go-amqp issue.

package servicebusrace_test

import (
	"context"
	"math/rand"
	"os"
	"strconv"
	"sync"
	"testing"
	"time"

	servicebus "github.com/Azure/azure-service-bus-go"
)

func random() string {
	rand.Seed(time.Now().UnixNano())
	return strconv.Itoa(rand.Int())
}

func TestServicebus(t *testing.T) {
	if testing.Short() {
		t.Skip()
	}
	ctx := context.Background()
	connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
	if connStr == "" {
		t.Skip()
	}
	ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
	if err != nil {
		t.Fatal(err)
	}
	qm := ns.NewQueueManager()

	name := random()
	_, err = qm.Put(ctx, name)
	if err != nil {
		t.Fatal(err)
	}

	t.Cleanup(func() {
		qm.Delete(ctx, name)
	})

	q, err := ns.NewQueue(name)
	if err != nil {
		t.Fatal(err)
	}

	const n = 10

	t.Run("send to servicebus", func(t *testing.T) {
		var wg sync.WaitGroup
		for i := 0; i < n; i++ {
			i := i
			wg.Add(1)
			go func() {
				defer wg.Done()
				data := strconv.Itoa(i)
				if err := q.Send(ctx, &servicebus.Message{Data: []byte(data)}); err != nil {
					panic(err)
				}
			}()
		}
		wg.Wait()
	})

	t.Run("receive from servicebus", func(t *testing.T) {
		result := make([]string, n)
		var wg sync.WaitGroup
		for i := 0; i < n; i++ {
			wg.Add(1)
			go func() {
				defer wg.Done()
				if err := q.ReceiveOne(ctx, servicebus.HandlerFunc(func(c context.Context, m *servicebus.Message) error {
					idx, err := strconv.Atoi(string(m.Data))
					if err != nil {
						return err
					}
					result[idx] = string(m.Data)
					return m.Complete(ctx)
				})); err != nil {
					panic(err)
				}
			}()
		}
		wg.Wait()
		for i, r := range result {
			if strconv.Itoa(i) != r {
				t.Errorf("missmatch result %s", r)
			}
		}
	})
}
=== RUN   TestServicebus
=== RUN   TestServicebus/send_to_servicebus
=== RUN   TestServicebus/receive_from_servicebus
==================
WARNING: DATA RACE
Write at 0x00c0002520f0 by goroutine 59:
  github.com/Azure/go-amqp.(*inFlight).add()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:280 +0x18d
  github.com/Azure/go-amqp.(*Receiver).messageDisposition()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:243 +0x3d3
  github.com/Azure/go-amqp.(*Message).Accept()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/types.go:1805 +0x16c
  github.com/Azure/azure-service-bus-go.(*Message).Complete()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/message.go:185 +0x206
  github.com/johejo/servicebusrace_test.TestServicebus.func3.1.1()
      /home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:81 +0x1bd
  github.com/Azure/azure-service-bus-go.HandlerFunc.Handle()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/handler.go:33 +0x51
  github.com/Azure/azure-service-bus-go.(*amqpAdapterHandler).Handle()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/amqphandler.go:71 +0x43b
  github.com/Azure/azure-service-bus-go.(*Receiver).listenForMessage.func1()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:346 +0x84
  github.com/Azure/go-amqp.(*Receiver).HandleMessage.func2()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:53 +0xf8
  github.com/Azure/go-amqp.(*Receiver).HandleMessage()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:71 +0x66d
  github.com/Azure/azure-service-bus-go.(*Receiver).listenForMessage()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:345 +0x210
  github.com/Azure/azure-service-bus-go.(*Receiver).ReceiveOne()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:200 +0x1a4
  github.com/Azure/azure-service-bus-go.(*Queue).ReceiveOne()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:217 +0x1f7
  github.com/johejo/servicebusrace_test.TestServicebus.func3.1()
      /home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:75 +0x16e

Previous read at 0x00c0002520f0 by goroutine 78:
  github.com/Azure/go-amqp.(*link).mux()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/link.go:297 +0xd0d

Goroutine 59 (running) created at:
  github.com/johejo/servicebusrace_test.TestServicebus.func3()
      /home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:73 +0x14f
  testing.tRunner()
      /home/heijo/ghq/go.googlesource.com/go/src/testing/testing.go:1194 +0x202

Goroutine 78 (running) created at:
  github.com/Azure/go-amqp.attachLink()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/link.go:220 +0x13a5
  github.com/Azure/go-amqp.(*Session).NewReceiver()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/session.go:90 +0x14d
  github.com/Azure/azure-service-bus-go.(*Receiver).newSessionAndLink()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:415 +0x5f4
  github.com/Azure/azure-service-bus-go.(*Namespace).NewReceiver()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:118 +0x2dc
  github.com/Azure/azure-service-bus-go.(*Queue).newReceiver()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:371 +0x3a4
  github.com/Azure/azure-service-bus-go.(*Queue).ensureReceiver()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:385 +0x1f8
  github.com/Azure/azure-service-bus-go.(*Queue).ReceiveOne()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:213 +0x15a
  github.com/johejo/servicebusrace_test.TestServicebus.func3.1()
      /home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:75 +0x16e
==================
==================
WARNING: DATA RACE
Write at 0x00c00053e094 by goroutine 78:
  github.com/Azure/go-amqp.(*link).muxReceive()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/link.go:523 +0xaaf
  github.com/Azure/go-amqp.(*link).muxHandleFrame()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/link.go:548 +0x357
  github.com/Azure/go-amqp.(*link).mux()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/link.go:304 +0xa99

Previous read at 0x00c00053e094 by goroutine 77:
  github.com/Azure/go-amqp.(*Session).mux()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/session.go:354 +0x108b

Goroutine 78 (running) created at:
  github.com/Azure/go-amqp.attachLink()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/link.go:220 +0x13a5
  github.com/Azure/go-amqp.(*Session).NewReceiver()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/session.go:90 +0x14d
  github.com/Azure/azure-service-bus-go.(*Receiver).newSessionAndLink()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:415 +0x5f4
  github.com/Azure/azure-service-bus-go.(*Namespace).NewReceiver()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:118 +0x2dc
  github.com/Azure/azure-service-bus-go.(*Queue).newReceiver()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:371 +0x3a4
  github.com/Azure/azure-service-bus-go.(*Queue).ensureReceiver()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:385 +0x1f8
  github.com/Azure/azure-service-bus-go.(*Queue).ReceiveOne()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:213 +0x15a
  github.com/johejo/servicebusrace_test.TestServicebus.func3.1()
      /home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:75 +0x16e

Goroutine 77 (running) created at:
  github.com/Azure/go-amqp.(*Client).NewSession()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/client.go:153 +0x684
  github.com/Azure/azure-service-bus-go.(*Receiver).newSessionAndLink()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:384 +0x2ce
  github.com/Azure/azure-service-bus-go.(*Namespace).NewReceiver()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:118 +0x2dc
  github.com/Azure/azure-service-bus-go.(*Queue).newReceiver()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:371 +0x3a4
  github.com/Azure/azure-service-bus-go.(*Queue).ensureReceiver()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:385 +0x1f8
  github.com/Azure/azure-service-bus-go.(*Queue).ReceiveOne()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:213 +0x15a
  github.com/johejo/servicebusrace_test.TestServicebus.func3.1()
      /home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:75 +0x16e
==================
==================
WARNING: DATA RACE
Write at 0x00c0000b50e0 by goroutine 60:
  runtime.mapassign_fast32()
      /home/heijo/ghq/go.googlesource.com/go/src/runtime/map_fast32.go:92 +0x0
  github.com/Azure/go-amqp.(*inFlight).add()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:282 +0xc4
  github.com/Azure/go-amqp.(*Receiver).messageDisposition()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:243 +0x3d3
  github.com/Azure/go-amqp.(*Message).Accept()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/types.go:1805 +0x16c
  github.com/Azure/azure-service-bus-go.(*Message).Complete()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/message.go:185 +0x206
  github.com/johejo/servicebusrace_test.TestServicebus.func3.1.1()
      /home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:81 +0x1bd
  github.com/Azure/azure-service-bus-go.HandlerFunc.Handle()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/handler.go:33 +0x51
  github.com/Azure/azure-service-bus-go.(*amqpAdapterHandler).Handle()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/amqphandler.go:71 +0x43b
  github.com/Azure/azure-service-bus-go.(*Receiver).listenForMessage.func1()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:346 +0x84
  github.com/Azure/go-amqp.(*Receiver).HandleMessage.func2()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:53 +0xf8
  github.com/Azure/go-amqp.(*Receiver).HandleMessage()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:71 +0x66d
  github.com/Azure/azure-service-bus-go.(*Receiver).listenForMessage()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:345 +0x210
  github.com/Azure/azure-service-bus-go.(*Receiver).ReceiveOne()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:200 +0x1a4
  github.com/Azure/azure-service-bus-go.(*Queue).ReceiveOne()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:217 +0x1f7
  github.com/johejo/servicebusrace_test.TestServicebus.func3.1()
      /home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:75 +0x16e

Previous read at 0x00c0000b50e0 by goroutine 78:
  github.com/Azure/go-amqp.(*link).mux()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/link.go:297 +0xd39

Goroutine 60 (running) created at:
  github.com/johejo/servicebusrace_test.TestServicebus.func3()
      /home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:73 +0x14f
  testing.tRunner()
      /home/heijo/ghq/go.googlesource.com/go/src/testing/testing.go:1194 +0x202

Goroutine 78 (running) created at:
  github.com/Azure/go-amqp.attachLink()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/link.go:220 +0x13a5
  github.com/Azure/go-amqp.(*Session).NewReceiver()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/session.go:90 +0x14d
  github.com/Azure/azure-service-bus-go.(*Receiver).newSessionAndLink()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:415 +0x5f4
  github.com/Azure/azure-service-bus-go.(*Namespace).NewReceiver()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:118 +0x2dc
  github.com/Azure/azure-service-bus-go.(*Queue).newReceiver()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:371 +0x3a4
  github.com/Azure/azure-service-bus-go.(*Queue).ensureReceiver()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:385 +0x1f8
  github.com/Azure/azure-service-bus-go.(*Queue).ReceiveOne()
      /home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:213 +0x15a
  github.com/johejo/servicebusrace_test.TestServicebus.func3.1()
      /home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:75 +0x16e
==================
    testing.go:1093: race detected during execution of test
=== CONT  TestServicebus
    testing.go:1093: race detected during execution of test
--- FAIL: TestServicebus (5.22s)
    --- PASS: TestServicebus/send_to_servicebus (0.76s)
    --- FAIL: TestServicebus/receive_from_servicebus (2.59s)
=== CONT
    testing.go:1093: race detected during execution of test
FAIL
FAIL    github.com/johejo/servicebusrace        5.231s
FAIL

Could not create new client using IP address

Hi,

I'm facing an issue when I tried to create a new client using New(conn net.Conn, opts ...ConnOption) function and the host that I'm currently using is IP address. Here are some code with dummy values:

conn, err := net.Dial("tcp", "10.10.10.10:61616")
client, err := amqp.New(conn, amqp.ConnServerHostname("10.10.10.10"), amqp.ConnSASLPlain("username, "password"), amqp.ConnIdleTimeout(0))
if err != nil {
	log.Fatal("Dialing AMQP server: ", err)
}

I got this in the response
"Dialing AMQP server: invalid frame body header"

Any advice on this?
Thank you

Dig deeper into mux handling of unexpected frames

Session.mux and link.muxHandleFrame both will swallow unexpected frames. While this seems reasonable, I wonder if it could be hiding bugs.

Maybe as a first step, add a build constraint that will panic() in these cases so we can catch this happening in CI/stress etc.

Receiver reading message from AMQP:EOF error

When running the example code included in the README.md, the receiver throws the following error approximately 1 min after receiving the last message from the broker queue:

2021/09/15 16:15:49 Reading message from AMQP:EOF
exit status 1 

The issue occurs while executing the following line of the example:

for {
	// Receive next message
	msg, err := receiver.Receive(ctx)
	if err != nil {
		log.Fatal("Reading message from AMQP:", err)
	}

Since it is executing in an infinite for-loop, it was expected to continue receiving messages until explicitly closed.
Using the HandleMessage method in place of Receive did not help. I have also tried changing the ConnIdleTimeout to both zero and non-zero values, and using a plain Background context without timeouts, but it did not have any effect.

I am using Solace PubSub+ event broker for the message queue. I have not used any SASL authentication.

Any help would be greatly appreciated.

The initial deliveryID for a msg is 1 not 0

deliveryID = atomic.AddUint32(&s.link.session.nextDeliveryID, 1)

When starting a link, the receiver sends a flow message expecting next next-deliver-id == 0,
but the send code starts with deliveryID = 1.

This causes the receive client to not acknowledge the second msg that is sent.

Consider wrapping the error returned when the remote service is at fault

Currently if the remote sender does something that's incorrect for the protocol we send them a ErrorNotAllowed, which makes sense.

However, we also return this error back to the local caller. This seems like one of those cases where we should consider marking this as an internal error instead, so it's clear that this is not something is incorrect on the client-side.

We can still preserve the context of the error, this would merely wrap the original error in a more sensible casing.

link:*Error{Condition: amqp:connection:forced, Description: , Info: map[]

hi im trying to create a sender and a receiver to connect to a amq MQ, im know very little about how ibm mq works, this is how im connecting to the mq and how im trying to create the sender:

client, err := amqp.Dial("amqp://localhost:5672",
		amqp.ConnSASLPlain("app", "passw0rd"),
	)
	if err != nil {
		log.Fatal("Dialing AMQP server:", err)
	}
	defer client.Close()

	// Open a session
	session, err := client.NewSession()
	if err != nil {
		log.Fatal("Creating AMQP session:", err)
	}

	ctx := context.Background()

{

		// Create a sender
		sender, err := session.NewSender(
			amqp.LinkTargetAddress("DEV.QUEUE.1"),
		)
		if err != nil {
			log.Fatal("Creating sender link:", err)
		}

		ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

		// Send message
		err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")))
		if err != nil {
			log.Fatal("Sending message:", err)
		}

		sender.Close(ctx)
		cancel()
	}

in the amqp.LinkTargetAddress("DEV.QUEUE.1"), i tried to put the name of the queue, the name of the queue manager and even the name of the amq channel that i create following ibm instructions, but nothing works,

with the mq-light library for node js it works just fine, could you help me telling me what im missing ?

Implementation issues?

Hello,

while experimenting with this library I noticed the following two points and would like to ask if I missed something or if this is an issue with the implementation:

  • remote-outgoing-window update condition
  • protocol might stop

remote-outgoing-window update condition
According to the spec (http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#doc-session-flow-control) the remote-outgoing-window should be update only if:

  • a transfer frame is received
  • or a flow frame is received

Is there any reason why the remote-outgoing-window gets assigned to the incoming window of the session for example like here:

remoteOutgoingWindow = s.incomingWindow

protocol might stop

Assumption:

  • server outgoing-window: 100
  • server incoming-window: 100
  • client outgoing-window: 1000
  • client incoming-window: 1000
  • server sends one flow frame right before sending the transfer frames

Am I right with the assumption that the client will never reach this code

go-amqp/client.go

Lines 706 to 708 in 6a20a07

// Update peer's outgoing window if half has been consumed.
if remoteOutgoingWindow < s.incomingWindow/2 {
nID := nextIncomingID

because the remote-outgoing-window is updated here (like in the protocol spec defined):
remoteOutgoingWindow = body.OutgoingWindow

Thank you in advance for your help in exploring this lib.

Cannot decode a valid AMQP message

Hello and thank you for this great library!
I'm using a Java library to publish a message to ActiveMQ via AMQP 1.0: Swift MQ.

I cannot decode the following message, it seems that the ApplicationProperties are decoded wrongly ( it is not read as a composite type ).

\x00\x80\x00\x00\x00\x00\x00\x00\x00p\xc0\x07\x05AP\x04@BC\x00\x80\x00\x00\x00\x00\x00\x00\x00s\xc0*\x03\xa1\x187735812932138480283/1/12@\xa1\x0cActiveMQ.DLQ\x00\x80\x00\x00\x00\x00\x00\x00\x00t\xc1H\x08\xa1\x06prop01\xa1\x06val001\xa1\x07prop002\xa1\x02v2\xa1\rprop000000003\x81\x00\x00\x00\x00\x00\x01\x86\xa0\xa1\x05prop4\xa1\tval000004\x00\x80\x00\x00\x00\x00\x00\x00\x00w\xa1\x99{"id":"000000000","prop4":"val000004","prop002Code":"v2","___prop000000003":10.0,"_______prop000000003":"10.0","prop0005":100,"_________prop01":"val001"}

Or, in an int representation:

0, 128, 0, 0, 0, 0, 0, 0, 0, 112, 192, 7, 5, 65, 80, 4, 64, 66, 67, 0, 128, 0, 0, 0, 0, 0, 0, 0, 115, 192, 42, 3, 161, 24, 55, 55, 51, 53, 56, 49, 50, 57, 51, 50, 49, 51, 56, 52, 56, 48, 50, 56, 51, 47, 49, 47, 49, 50, 64, 161, 12, 65, 99, 116, 105, 118, 101, 77, 81, 46, 68, 76, 81, 0, 128, 0, 0, 0, 0, 0, 0, 0, 116, 193, 72, 8, 161, 6, 112, 114, 111, 112, 48, 49, 161, 6, 118, 97, 108, 48, 48, 49, 161, 7, 112, 114, 111, 112, 48, 48, 50, 161, 2, 118, 50, 161, 13, 112, 114, 111, 112, 48, 48, 48, 48, 48, 48, 48, 48, 51, 129, 0, 0, 0, 0, 0, 1, 134, 160, 161, 5, 112, 114, 111, 112, 52, 161, 9, 118, 97, 108, 48, 48, 48, 48, 48, 52, 0, 128, 0, 0, 0, 0, 0, 0, 0, 119, 161, 153, 123, 34, 105, 100, 34, 58, 34, 48, 48, 48, 48, 48, 48, 48, 48, 48, 34, 44, 34, 112, 114, 111, 112, 52, 34, 58, 34, 118, 97, 108, 48, 48, 48, 48, 48, 52, 34, 44, 34, 112, 114, 111, 112, 48, 48, 50, 67, 111, 100, 101, 34, 58, 34, 118, 50, 34, 44, 34, 95, 95, 95, 112, 114, 111, 112, 48, 48, 48, 48, 48, 48, 48, 48, 51, 34, 58, 49, 48, 46, 48, 44, 34, 95, 95, 95, 95, 95, 95, 95, 112, 114, 111, 112, 48, 48, 48, 48, 48, 48, 48, 48, 51, 34, 58, 34, 49, 48, 46, 48, 34, 44, 34, 112, 114, 111, 112, 48, 48, 48, 53, 34, 58, 49, 48, 48, 44, 34, 95, 95, 95, 95, 95, 95, 95, 95, 95, 112, 114, 111, 112, 48, 49, 34, 58, 34, 118, 97, 108, 48, 48, 49, 34, 125

I'm looking forward to being able to share messages across the 2 platforms!
Thank you for your help and precious time.

EDIT:
ActiveMQ shows the message application properties correctly, hence the AMQP message shall be a valid one, at least up to the Application properties that fail to decode with go-amqp.

Investigate fuzz tests

The idea is sound, but it's not clear if invalid input actually triggers any test failures.

Receiver Close() times out due to block in receive

I am using this package through the https://github.com/Azure/azure-event-hubs-go SDK to continuously receive messages from an event hub.

When the system experiences extreme load and/or occasional network instability, calls to Close() on an individual Event Hubs receiver (and by extension, its underlying receiver/link from this package) have a tendency to hang until the provided context expires. My typical context timeout is 10 seconds, but I've observed the same with much longer timeouts (it doesn't appear to be related to time).

I recently captured a goroutine dump of a process that was encountering this problem and observed that each timed out Close() was causing an extra goroutine to be left dangling indefinitely (along with the full prefetch buffer for the associated link). These are the relevant groups of leaky goroutines:

1522 @ 0x43ab25 0x4059fa 0x405755 0xa70197 0xa70df8 0xa6f47f 0x4717c1
#	0xa70196	github.com/Azure/go-amqp.(*link).muxReceive+0x276	/go/pkg/mod/github.com/!azure/[email protected]/link.go:513
#	0xa70df7	github.com/Azure/go-amqp.(*link).muxHandleFrame+0x117	/go/pkg/mod/github.com/!azure/[email protected]/link.go:548
#	0xa6f47e	github.com/Azure/go-amqp.(*link).mux+0x37e		/go/pkg/mod/github.com/!azure/[email protected]/link.go:304

386 @ 0x43ab25 0x4059fa 0x405755 0xa70197 0xa70df8 0xa6fe35 0xa6f2d6 0x4717c1
#	0xa70196	github.com/Azure/go-amqp.(*link).muxReceive+0x276	/go/pkg/mod/github.com/!azure/[email protected]/link.go:513
#	0xa70df7	github.com/Azure/go-amqp.(*link).muxHandleFrame+0x117	/go/pkg/mod/github.com/!azure/[email protected]/link.go:548
#	0xa6fe34	github.com/Azure/go-amqp.(*link).muxFlow+0x234		/go/pkg/mod/github.com/!azure/[email protected]/link.go:380
#	0xa6f2d5	github.com/Azure/go-amqp.(*link).mux+0x1d5		/go/pkg/mod/github.com/!azure/[email protected]/link.go:290

When inspecting the source for the version I'm using (0.13.4), the line where those goroutines are hung is the last line of this block:

go-amqp/link.go

Lines 510 to 513 in 2957005

// send to receiver, this should never block due to buffering
// and flow control.
l.addUnsettled(&l.msg)
l.messages <- l.msg

As the comment mentions, the code is designed never to block here (due to the channel being buffered for the exact number of messages in the link credit), but something is clearly violating that assumption.

I'm not sure whether the issue is a protocol violation by Event Hubs w.r.t. link credit or a bug causing extra messages to be buffered in the channel. Either way, it seems dangerous to assume that everything will be perfectly well-behaved all the time. It seems like a simple select to handle the shutdown case would solve the problem here. Happy to send a PR if it seems like a reasonable change.

Link errors need to be wrapped in an amqp.DetachError

Currently network errors will leave the link in a detached state (which is fine) but will not indicate that detach state via the returned error. In paticular, this code in Receive() can be problematic:

func (r *Receiver) Receive(ctx context.Context) (*Message, error) {
	msg, err := r.Prefetched(ctx)

	if err != nil || msg != nil {
		return msg, err
	}

	// wait for the next message
	select {
	case msg := <-r.link.Messages:
		debug(3, "Receive() blocking %d", msg.deliveryID)
		msg.link = r.link
		return acceptIfModeFirst(ctx, r, &msg)
	case <-r.link.Detached:
                // NOTE: 
                // NOTE: r.link.err is not guaranteed to indicate the link is detached (ie, it can just be a network error)
                // NOTE: 
		return nil, r.link.err
	case <-ctx.Done():
		return nil, ctx.Err()
	}
}

The bit NOTE'd above will cause errors if you're using the error to determine if the link needs to be recovered. In general a caller should be able to see amqp.DetachError{}, for instance, and know they can do something useful here, like recreate just this link.

Messages not settled in ModeFirst

Hello,

I am connecting in modefirst with a linkcredit of 100 as follows:

err := q.receiver.HandleMessage(ctx, q.onPublish)
if err != nil {
q.log.Error(err, "Error with queue")
}
func (q *Queue) onPublish(msg *amqp.Message) error {
	message := new(core.Message)
	if err := json.Unmarshal(msg.GetData(), message); err != nil {
		q.log.Error(err, "could not get data")
		msg.Reject(context.Background(), &amqp.Error{
			Condition:   "",
			Description: err.Error(),
			Info:        nil,
		})
		return err
	}
	q.messages <- *message
	err := msg.Accept(context.Background())
	if err != nil {
		return err
	}
	return nil
}

After hitting the credit of 100 messages, my service does not receive messages anymore although the broker says, that 100 messages were acknowledget by the subscriber.

I assume that go-amqp with the new HandleMessage doesn't differ that much from the introduction example, but what am I doing wrong ?

Best regards

Remove Session window config options

When the incoming window reaches zero, no more messages should be accepted. Unfortunately the code doesn't actually honor this.

Have not looked at outgoing window.

Investigate tests that still require sleeps

There are some tests that require a time.Sleep() between actions. Some of these are due to legitimate races, e.g. in conn_test.go TestClose, starting and immediately stopping the connection never gives the connReader, connWriter, and mux goroutines to start. This is a corner-case, but the race is real.

This is further surfaced in NetConn, having to use a buffered channel for the reader (see code comment for full explanation).

IssueCredit() doesn't always trigger a flow frame

Currently IssueCredit doesn't directly issue the FLOW frame, preferring to let the normal mux() loop in link handle it.

In testing it looks like this occasionally does not happen (or at the very least it's not timely) and this results in a link with no credits added.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.