azure / go-amqp Goto Github PK
View Code? Open in Web Editor NEWThis project forked from vcabbage/amqp
AMQP 1.0 client library for Go.
Home Page: https://github.com/Azure/go-amqp
License: MIT License
This project forked from vcabbage/amqp
AMQP 1.0 client library for Go.
Home Page: https://github.com/Azure/go-amqp
License: MIT License
I got this when trying to running two or more AMQP receivers in separate process (executables) with the Apache Qpid Dispatch Router. It works fine with the Qpid Electron Go package. Also no issues with only one AMQP receiver. But this is not usable.
*Error{Condition: amqp:session:invalid-field, Description: sequencing error, expected delivery-id 3, got 4, Info: map[]}
I need a small snippet please.
Build the auto-credit management on top of the manual system to consolidate the two.
i init receiver with session.NewReceiver using amqp.LinkCredit(10), and send message to another queue when received.
the send method will check if sender exist, if not, new a sender with session.NewSender......
that was bad, I cant get the sender for a long while and the message cannot be consumed....
so now, i have to make receiver using option amqp.LinkCredit(1) to resolve
When the session mux unwinds, it tells the conn mux to delete the session.
select {
case s.conn.DelSession <- s:
case <-s.conn.Done:
s.err = s.conn.Err()
}
This can deadlock with conn mux if it's trying to mux a frame to the same session.
// taken from conn.mux()
select {
case session.rx <- fr:
case <-c.closeMux:
return
}
Once a session (or any) mux has exited its "channels pump", we shouldn't attempt to write any data to its channels.
I am using this package through the https://github.com/Azure/azure-event-hubs-go SDK to continuously receive messages from an event hub.
When the system experiences extreme load and/or occasional network instability, calls to Close()
on an individual Event Hubs receiver (and by extension, its underlying receiver/link from this package) have a tendency to hang until the provided context expires. My typical context timeout is 10 seconds, but I've observed the same with much longer timeouts (it doesn't appear to be related to time).
I recently captured a goroutine dump of a process that was encountering this problem and observed that each timed out Close()
was causing an extra goroutine to be left dangling indefinitely (along with the full prefetch buffer for the associated link). These are the relevant groups of leaky goroutines:
1522 @ 0x43ab25 0x4059fa 0x405755 0xa70197 0xa70df8 0xa6f47f 0x4717c1
# 0xa70196 github.com/Azure/go-amqp.(*link).muxReceive+0x276 /go/pkg/mod/github.com/!azure/[email protected]/link.go:513
# 0xa70df7 github.com/Azure/go-amqp.(*link).muxHandleFrame+0x117 /go/pkg/mod/github.com/!azure/[email protected]/link.go:548
# 0xa6f47e github.com/Azure/go-amqp.(*link).mux+0x37e /go/pkg/mod/github.com/!azure/[email protected]/link.go:304
386 @ 0x43ab25 0x4059fa 0x405755 0xa70197 0xa70df8 0xa6fe35 0xa6f2d6 0x4717c1
# 0xa70196 github.com/Azure/go-amqp.(*link).muxReceive+0x276 /go/pkg/mod/github.com/!azure/[email protected]/link.go:513
# 0xa70df7 github.com/Azure/go-amqp.(*link).muxHandleFrame+0x117 /go/pkg/mod/github.com/!azure/[email protected]/link.go:548
# 0xa6fe34 github.com/Azure/go-amqp.(*link).muxFlow+0x234 /go/pkg/mod/github.com/!azure/[email protected]/link.go:380
# 0xa6f2d5 github.com/Azure/go-amqp.(*link).mux+0x1d5 /go/pkg/mod/github.com/!azure/[email protected]/link.go:290
When inspecting the source for the version I'm using (0.13.4), the line where those goroutines are hung is the last line of this block:
Lines 510 to 513 in 2957005
As the comment mentions, the code is designed never to block here (due to the channel being buffered for the exact number of messages in the link credit), but something is clearly violating that assumption.
I'm not sure whether the issue is a protocol violation by Event Hubs w.r.t. link credit or a bug causing extra messages to be buffered in the channel. Either way, it seems dangerous to assume that everything will be perfectly well-behaved all the time. It seems like a simple select
to handle the shutdown case would solve the problem here. Happy to send a PR if it seems like a reasonable change.
Hello and thank you for this great library!
I'm using a Java library to publish a message to ActiveMQ via AMQP 1.0: Swift MQ.
I cannot decode the following message, it seems that the ApplicationProperties are decoded wrongly ( it is not read as a composite type ).
\x00\x80\x00\x00\x00\x00\x00\x00\x00p\xc0\x07\x05AP\x04@BC\x00\x80\x00\x00\x00\x00\x00\x00\x00s\xc0*\x03\xa1\x187735812932138480283/1/12@\xa1\x0cActiveMQ.DLQ\x00\x80\x00\x00\x00\x00\x00\x00\x00t\xc1H\x08\xa1\x06prop01\xa1\x06val001\xa1\x07prop002\xa1\x02v2\xa1\rprop000000003\x81\x00\x00\x00\x00\x00\x01\x86\xa0\xa1\x05prop4\xa1\tval000004\x00\x80\x00\x00\x00\x00\x00\x00\x00w\xa1\x99{"id":"000000000","prop4":"val000004","prop002Code":"v2","___prop000000003":10.0,"_______prop000000003":"10.0","prop0005":100,"_________prop01":"val001"}
Or, in an int representation:
0, 128, 0, 0, 0, 0, 0, 0, 0, 112, 192, 7, 5, 65, 80, 4, 64, 66, 67, 0, 128, 0, 0, 0, 0, 0, 0, 0, 115, 192, 42, 3, 161, 24, 55, 55, 51, 53, 56, 49, 50, 57, 51, 50, 49, 51, 56, 52, 56, 48, 50, 56, 51, 47, 49, 47, 49, 50, 64, 161, 12, 65, 99, 116, 105, 118, 101, 77, 81, 46, 68, 76, 81, 0, 128, 0, 0, 0, 0, 0, 0, 0, 116, 193, 72, 8, 161, 6, 112, 114, 111, 112, 48, 49, 161, 6, 118, 97, 108, 48, 48, 49, 161, 7, 112, 114, 111, 112, 48, 48, 50, 161, 2, 118, 50, 161, 13, 112, 114, 111, 112, 48, 48, 48, 48, 48, 48, 48, 48, 51, 129, 0, 0, 0, 0, 0, 1, 134, 160, 161, 5, 112, 114, 111, 112, 52, 161, 9, 118, 97, 108, 48, 48, 48, 48, 48, 52, 0, 128, 0, 0, 0, 0, 0, 0, 0, 119, 161, 153, 123, 34, 105, 100, 34, 58, 34, 48, 48, 48, 48, 48, 48, 48, 48, 48, 34, 44, 34, 112, 114, 111, 112, 52, 34, 58, 34, 118, 97, 108, 48, 48, 48, 48, 48, 52, 34, 44, 34, 112, 114, 111, 112, 48, 48, 50, 67, 111, 100, 101, 34, 58, 34, 118, 50, 34, 44, 34, 95, 95, 95, 112, 114, 111, 112, 48, 48, 48, 48, 48, 48, 48, 48, 51, 34, 58, 49, 48, 46, 48, 44, 34, 95, 95, 95, 95, 95, 95, 95, 112, 114, 111, 112, 48, 48, 48, 48, 48, 48, 48, 48, 51, 34, 58, 34, 49, 48, 46, 48, 34, 44, 34, 112, 114, 111, 112, 48, 48, 48, 53, 34, 58, 49, 48, 48, 44, 34, 95, 95, 95, 95, 95, 95, 95, 95, 95, 112, 114, 111, 112, 48, 49, 34, 58, 34, 118, 97, 108, 48, 48, 49, 34, 125
I'm looking forward to being able to share messages across the 2 platforms!
Thank you for your help and precious time.
EDIT:
ActiveMQ shows the message application properties correctly, hence the AMQP message shall be a valid one, at least up to the Application properties that fail to decode with go-amqp.
Test against Service Bus and Event Hubs
Per http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#doc-idp115568
mandatory: "true" iff a non null value for the field is always encoded.
What this means is that for all non-mandatory fields in a composite type, they must be pointer-to-type. That said, we should be smart in how we fix things up. E.g. the description
field in the error
type isn't mandatory, however making it be *string
doesn't really add any value.
Session.mux and link.muxHandleFrame both will swallow unexpected frames. While this seems reasonable, I wonder if it could be hiding bugs.
Maybe as a first step, add a build constraint that will panic()
in these cases so we can catch this happening in CI/stress etc.
Currently, if a caller is .Receive()'ing from a link and the link fails (detached/closed) it will return an error.
As part of this we want to make sure that this logical sequence of events is respected:
So if we are calling .Receive() in a loop, for instance, we should receive all the messages cached in the internal channel first before we receive the detach error. This makes it simple for clients to know when they can properly abandon a link instance, leaving it with no vital state.
This might already be the state of affairs, just want to validate before a potential GA of this lib.
It occurs in v0.13.3 or later.
I thought this might be a go-amqp issue.
package servicebusrace_test
import (
"context"
"math/rand"
"os"
"strconv"
"sync"
"testing"
"time"
servicebus "github.com/Azure/azure-service-bus-go"
)
func random() string {
rand.Seed(time.Now().UnixNano())
return strconv.Itoa(rand.Int())
}
func TestServicebus(t *testing.T) {
if testing.Short() {
t.Skip()
}
ctx := context.Background()
connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
t.Skip()
}
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
t.Fatal(err)
}
qm := ns.NewQueueManager()
name := random()
_, err = qm.Put(ctx, name)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
qm.Delete(ctx, name)
})
q, err := ns.NewQueue(name)
if err != nil {
t.Fatal(err)
}
const n = 10
t.Run("send to servicebus", func(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < n; i++ {
i := i
wg.Add(1)
go func() {
defer wg.Done()
data := strconv.Itoa(i)
if err := q.Send(ctx, &servicebus.Message{Data: []byte(data)}); err != nil {
panic(err)
}
}()
}
wg.Wait()
})
t.Run("receive from servicebus", func(t *testing.T) {
result := make([]string, n)
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if err := q.ReceiveOne(ctx, servicebus.HandlerFunc(func(c context.Context, m *servicebus.Message) error {
idx, err := strconv.Atoi(string(m.Data))
if err != nil {
return err
}
result[idx] = string(m.Data)
return m.Complete(ctx)
})); err != nil {
panic(err)
}
}()
}
wg.Wait()
for i, r := range result {
if strconv.Itoa(i) != r {
t.Errorf("missmatch result %s", r)
}
}
})
}
=== RUN TestServicebus
=== RUN TestServicebus/send_to_servicebus
=== RUN TestServicebus/receive_from_servicebus
==================
WARNING: DATA RACE
Write at 0x00c0002520f0 by goroutine 59:
github.com/Azure/go-amqp.(*inFlight).add()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:280 +0x18d
github.com/Azure/go-amqp.(*Receiver).messageDisposition()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:243 +0x3d3
github.com/Azure/go-amqp.(*Message).Accept()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/types.go:1805 +0x16c
github.com/Azure/azure-service-bus-go.(*Message).Complete()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/message.go:185 +0x206
github.com/johejo/servicebusrace_test.TestServicebus.func3.1.1()
/home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:81 +0x1bd
github.com/Azure/azure-service-bus-go.HandlerFunc.Handle()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/handler.go:33 +0x51
github.com/Azure/azure-service-bus-go.(*amqpAdapterHandler).Handle()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/amqphandler.go:71 +0x43b
github.com/Azure/azure-service-bus-go.(*Receiver).listenForMessage.func1()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:346 +0x84
github.com/Azure/go-amqp.(*Receiver).HandleMessage.func2()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:53 +0xf8
github.com/Azure/go-amqp.(*Receiver).HandleMessage()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:71 +0x66d
github.com/Azure/azure-service-bus-go.(*Receiver).listenForMessage()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:345 +0x210
github.com/Azure/azure-service-bus-go.(*Receiver).ReceiveOne()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:200 +0x1a4
github.com/Azure/azure-service-bus-go.(*Queue).ReceiveOne()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:217 +0x1f7
github.com/johejo/servicebusrace_test.TestServicebus.func3.1()
/home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:75 +0x16e
Previous read at 0x00c0002520f0 by goroutine 78:
github.com/Azure/go-amqp.(*link).mux()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/link.go:297 +0xd0d
Goroutine 59 (running) created at:
github.com/johejo/servicebusrace_test.TestServicebus.func3()
/home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:73 +0x14f
testing.tRunner()
/home/heijo/ghq/go.googlesource.com/go/src/testing/testing.go:1194 +0x202
Goroutine 78 (running) created at:
github.com/Azure/go-amqp.attachLink()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/link.go:220 +0x13a5
github.com/Azure/go-amqp.(*Session).NewReceiver()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/session.go:90 +0x14d
github.com/Azure/azure-service-bus-go.(*Receiver).newSessionAndLink()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:415 +0x5f4
github.com/Azure/azure-service-bus-go.(*Namespace).NewReceiver()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:118 +0x2dc
github.com/Azure/azure-service-bus-go.(*Queue).newReceiver()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:371 +0x3a4
github.com/Azure/azure-service-bus-go.(*Queue).ensureReceiver()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:385 +0x1f8
github.com/Azure/azure-service-bus-go.(*Queue).ReceiveOne()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:213 +0x15a
github.com/johejo/servicebusrace_test.TestServicebus.func3.1()
/home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:75 +0x16e
==================
==================
WARNING: DATA RACE
Write at 0x00c00053e094 by goroutine 78:
github.com/Azure/go-amqp.(*link).muxReceive()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/link.go:523 +0xaaf
github.com/Azure/go-amqp.(*link).muxHandleFrame()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/link.go:548 +0x357
github.com/Azure/go-amqp.(*link).mux()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/link.go:304 +0xa99
Previous read at 0x00c00053e094 by goroutine 77:
github.com/Azure/go-amqp.(*Session).mux()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/session.go:354 +0x108b
Goroutine 78 (running) created at:
github.com/Azure/go-amqp.attachLink()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/link.go:220 +0x13a5
github.com/Azure/go-amqp.(*Session).NewReceiver()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/session.go:90 +0x14d
github.com/Azure/azure-service-bus-go.(*Receiver).newSessionAndLink()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:415 +0x5f4
github.com/Azure/azure-service-bus-go.(*Namespace).NewReceiver()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:118 +0x2dc
github.com/Azure/azure-service-bus-go.(*Queue).newReceiver()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:371 +0x3a4
github.com/Azure/azure-service-bus-go.(*Queue).ensureReceiver()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:385 +0x1f8
github.com/Azure/azure-service-bus-go.(*Queue).ReceiveOne()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:213 +0x15a
github.com/johejo/servicebusrace_test.TestServicebus.func3.1()
/home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:75 +0x16e
Goroutine 77 (running) created at:
github.com/Azure/go-amqp.(*Client).NewSession()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/client.go:153 +0x684
github.com/Azure/azure-service-bus-go.(*Receiver).newSessionAndLink()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:384 +0x2ce
github.com/Azure/azure-service-bus-go.(*Namespace).NewReceiver()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:118 +0x2dc
github.com/Azure/azure-service-bus-go.(*Queue).newReceiver()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:371 +0x3a4
github.com/Azure/azure-service-bus-go.(*Queue).ensureReceiver()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:385 +0x1f8
github.com/Azure/azure-service-bus-go.(*Queue).ReceiveOne()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:213 +0x15a
github.com/johejo/servicebusrace_test.TestServicebus.func3.1()
/home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:75 +0x16e
==================
==================
WARNING: DATA RACE
Write at 0x00c0000b50e0 by goroutine 60:
runtime.mapassign_fast32()
/home/heijo/ghq/go.googlesource.com/go/src/runtime/map_fast32.go:92 +0x0
github.com/Azure/go-amqp.(*inFlight).add()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:282 +0xc4
github.com/Azure/go-amqp.(*Receiver).messageDisposition()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:243 +0x3d3
github.com/Azure/go-amqp.(*Message).Accept()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/types.go:1805 +0x16c
github.com/Azure/azure-service-bus-go.(*Message).Complete()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/message.go:185 +0x206
github.com/johejo/servicebusrace_test.TestServicebus.func3.1.1()
/home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:81 +0x1bd
github.com/Azure/azure-service-bus-go.HandlerFunc.Handle()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/handler.go:33 +0x51
github.com/Azure/azure-service-bus-go.(*amqpAdapterHandler).Handle()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/amqphandler.go:71 +0x43b
github.com/Azure/azure-service-bus-go.(*Receiver).listenForMessage.func1()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:346 +0x84
github.com/Azure/go-amqp.(*Receiver).HandleMessage.func2()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:53 +0xf8
github.com/Azure/go-amqp.(*Receiver).HandleMessage()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:71 +0x66d
github.com/Azure/azure-service-bus-go.(*Receiver).listenForMessage()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:345 +0x210
github.com/Azure/azure-service-bus-go.(*Receiver).ReceiveOne()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:200 +0x1a4
github.com/Azure/azure-service-bus-go.(*Queue).ReceiveOne()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:217 +0x1f7
github.com/johejo/servicebusrace_test.TestServicebus.func3.1()
/home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:75 +0x16e
Previous read at 0x00c0000b50e0 by goroutine 78:
github.com/Azure/go-amqp.(*link).mux()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/link.go:297 +0xd39
Goroutine 60 (running) created at:
github.com/johejo/servicebusrace_test.TestServicebus.func3()
/home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:73 +0x14f
testing.tRunner()
/home/heijo/ghq/go.googlesource.com/go/src/testing/testing.go:1194 +0x202
Goroutine 78 (running) created at:
github.com/Azure/go-amqp.attachLink()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/link.go:220 +0x13a5
github.com/Azure/go-amqp.(*Session).NewReceiver()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/session.go:90 +0x14d
github.com/Azure/azure-service-bus-go.(*Receiver).newSessionAndLink()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:415 +0x5f4
github.com/Azure/azure-service-bus-go.(*Namespace).NewReceiver()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/receiver.go:118 +0x2dc
github.com/Azure/azure-service-bus-go.(*Queue).newReceiver()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:371 +0x3a4
github.com/Azure/azure-service-bus-go.(*Queue).ensureReceiver()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:385 +0x1f8
github.com/Azure/azure-service-bus-go.(*Queue).ReceiveOne()
/home/heijo/go/pkg/mod/github.com/!azure/[email protected]/queue.go:213 +0x15a
github.com/johejo/servicebusrace_test.TestServicebus.func3.1()
/home/heijo/ghq/github.com/johejo/servicebusrace/servicebus_test.go:75 +0x16e
==================
testing.go:1093: race detected during execution of test
=== CONT TestServicebus
testing.go:1093: race detected during execution of test
--- FAIL: TestServicebus (5.22s)
--- PASS: TestServicebus/send_to_servicebus (0.76s)
--- FAIL: TestServicebus/receive_from_servicebus (2.59s)
=== CONT
testing.go:1093: race detected during execution of test
FAIL
FAIL github.com/johejo/servicebusrace 5.231s
FAIL
Today, the only method for enabling go-amqp logging is to recompile your binary with a debug
tag. However, this output is just goes to stdout/stderr and isn't capturable by the caller.
This means that libraries like azservicebus, which have their own pluggable logging systems (via azcore/log), aren't able to redirect those log messages to the output source the customer chose. It also means we can't automatically activate the logging, at runtime since it's compile-time only.
As part of this it's a good chance to look at our logging and see which parts might be optional - errors that do bubble up to the customer with enough tracing information don't need to be logged. However, FLOW frames, which "flow" in the background, or when channels being blocked when writing/reading (indicating potential performance issues) will be, as these are typically not visible.
hi im trying to create a sender and a receiver to connect to a amq MQ, im know very little about how ibm mq works, this is how im connecting to the mq and how im trying to create the sender:
client, err := amqp.Dial("amqp://localhost:5672",
amqp.ConnSASLPlain("app", "passw0rd"),
)
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
defer client.Close()
// Open a session
session, err := client.NewSession()
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
ctx := context.Background()
{
// Create a sender
sender, err := session.NewSender(
amqp.LinkTargetAddress("DEV.QUEUE.1"),
)
if err != nil {
log.Fatal("Creating sender link:", err)
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
// Send message
err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")))
if err != nil {
log.Fatal("Sending message:", err)
}
sender.Close(ctx)
cancel()
}
in the amqp.LinkTargetAddress("DEV.QUEUE.1"), i tried to put the name of the queue, the name of the queue manager and even the name of the amq channel that i create following ibm instructions, but nothing works,
with the mq-light library for node js it works just fine, could you help me telling me what im missing ?
Currently IssueCredit doesn't directly issue the FLOW frame, preferring to let the normal mux() loop in link handle it.
In testing it looks like this occasionally does not happen (or at the very least it's not timely) and this results in a link with no credits added.
Hi,
I am trying to create a new client which connects to Artemis using AMQP 1.0.
I want the client to explicitly call the 'AcceptMessage' before Artemis removes the message as documented when creating the receiver with settle mode 'second'.
docker-compose.yml
version: '3'
services:
artemis:
image: vromero/activemq-artemis
environment:
- ARTEMIS_USERNAME=artemis
- ARTEMIS_PASSWORD=artemis
- DISABLE_SECURITY=true
ports:
- "8161:8161"
- "61616:61616"
- "5672:5672"
restart: always
main.go
package main
import (
"context"
"github.com/Azure/go-amqp"
"github.com/sirupsen/logrus"
)
func main() {
addr := "127.0.0.1"
client, err := newClient(addr)
if err != nil {
logrus.Fatalf("failed to create client - %s", err.Error())
}
session, err := client.NewSession()
if err != nil {
logrus.Fatalf("failed to create session - %s", err.Error())
}
name := "my.topic"
receiver, err := session.NewReceiver(
amqp.LinkName("my-queue"),
amqp.LinkSourceAddress(name),
amqp.LinkCredit(1),
amqp.LinkReceiverSettle(amqp.ModeSecond),
amqp.LinkSourceCapabilities("shared", "global", "topic"),
amqp.LinkSourceDurability(amqp.DurabilityUnsettledState),
amqp.LinkSourceExpiryPolicy(amqp.ExpiryNever),
)
if err != nil {
logrus.Fatalf("failed to create receiver - %s", err.Error())
}
logrus.Infof("now listening to events from %s", name)
for {
msg, err := receiver.Receive(context.Background())
if err != nil {
logrus.Fatalf("reading message from %s artemis failed", name)
}
logrus.Infof("received message from artemis on topic %s with payload %s", addr, msg.Value)
// Implement handle & accept/reject message here
}
}
func newClient(addr string) (*amqp.Client, error) {
return amqp.Dial(addr, amqp.ConnContainerID("testing-for-github"))
}
This code results in the following output;
FATA[0000] failed to create receiver - amqp: receiver settlement mode "second" requested, received "first" from server
It seems this is because of a new implementation feature that checks the settle mode received from the server located at "[email protected]/link.go:291".
I am not sure if this check is really required and why it needs to be checked?
To check if this is the only thing that blocks my purpose, I tried creating a fork that only has the check removed from the library. This resulted in the following output:
INFO[0000] now listening to events from my.topic
INFO[0015] received message from artemis on topic 127.0.0.1 with payload test
The message is correctly received and processed, but it is also directly confirmed, even when I have the receiver's settle mode set to 'second'.
Could you see what I am doing wrong here?
Request to add the amqp+ssl
scheme. This appears to be the same as the amqps
scheme that is already supported. If you want a PR for this I can submit one. This is so that it will be compatible with ActiveMQ AMQP over SSL.
Currently network errors will leave the link in a detached state (which is fine) but will not indicate that detach state via the returned error. In paticular, this code in Receive() can be problematic:
func (r *Receiver) Receive(ctx context.Context) (*Message, error) {
msg, err := r.Prefetched(ctx)
if err != nil || msg != nil {
return msg, err
}
// wait for the next message
select {
case msg := <-r.link.Messages:
debug(3, "Receive() blocking %d", msg.deliveryID)
msg.link = r.link
return acceptIfModeFirst(ctx, r, &msg)
case <-r.link.Detached:
// NOTE:
// NOTE: r.link.err is not guaranteed to indicate the link is detached (ie, it can just be a network error)
// NOTE:
return nil, r.link.err
case <-ctx.Done():
return nil, ctx.Err()
}
}
The bit NOTE'd above will cause errors if you're using the error to determine if the link needs to be recovered. In general a caller should be able to see amqp.DetachError{}, for instance, and know they can do something useful here, like recreate just this link.
There are some tests that require a time.Sleep()
between actions. Some of these are due to legitimate races, e.g. in conn_test.go TestClose, starting and immediately stopping the connection never gives the connReader, connWriter, and mux goroutines to start. This is a corner-case, but the race is real.
This is further surfaced in NetConn, having to use a buffered channel for the reader (see code comment for full explanation).
The idea is sound, but it's not clear if invalid input actually triggers any test failures.
When the incoming window reaches zero, no more messages should be accepted. Unfortunately the code doesn't actually honor this.
Have not looked at outgoing window.
Currently go-amqp calls .Accept() on messages received in ModeFirst.
From our reading of the spec it's possible we don't need to do that, which can be beneficial to callers because it removes a bit of I/O in that path (and removes a potential point of failure for no gain).
This is mostly just a "check that our behavior matches other reference implementations" issue.
This would allow responding to the event that the link has been detached.
Currently if the remote sender does something that's incorrect for the protocol we send them a ErrorNotAllowed
, which makes sense.
However, we also return this error back to the local caller. This seems like one of those cases where we should consider marking this as an internal error instead, so it's clear that this is not something is incorrect on the client-side.
We can still preserve the context of the error, this would merely wrap the original error in a more sensible casing.
Will happen week of March 7th.
Setting settlement mode to ModeSettled error out.
Creating sender link:amqp: sender settlement mode "settled" requested, received "mixed" from server
sender, err := session.NewSender(
amqp.LinkTargetAddress("/queue-name"),amqp.LinkSenderSettle(amqp.ModeSettled),
)
```
Hello,
while experimenting with this library I noticed the following two points and would like to ask if I missed something or if this is an issue with the implementation:
remote-outgoing-window update condition
According to the spec (http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#doc-session-flow-control) the remote-outgoing-window
should be update only if:
Is there any reason why the remote-outgoing-window
gets assigned to the incoming window of the session for example like here:
Line 717 in 6a20a07
protocol might stop
Assumption:
outgoing-window
: 100incoming-window
: 100outgoing-window
: 1000incoming-window
: 1000Am I right with the assumption that the client will never reach this code
Lines 706 to 708 in 6a20a07
remote-outgoing-window
is updated here (like in the protocol spec defined):Line 632 in 6a20a07
Thank you in advance for your help in exploring this lib.
Hi,
we've been using this library for a while without any issues to connect to a cpp qpid Broker.
The changes introduced with v0.13.6 however break our client.
If I understand correctly, any incoming message is added to unsettledMessages
via addUnsettled
, if it has an empty DeliveryTag
. It should get removed from there by a call to deleteUnsettled
.
The only place where deleteUnsettled
is called is within the trackCompletion
handler (we're using HandleMessage
). This however was recently changed to only be run with a receiverSettleMode of ModeSecond.
As can be seen from the following debug log, my link has receiverSettleMode "first", while my message does have a non-empty DeliveryTag "\x00\x00\x00\x00". If I understand correctly, my message gets added to the unsettledMessages map because it has a non-empty DeliveryTag, but because I'm in ModeSecond it never gets removed.
19:07:48.210557 FLOW Link Mux half: source: somewhere, inflight: 0, credit: 0, deliveryCount: 0, messages: 0, unsettled: 0, maxCredit : 1, settleMode: first
19:07:48.210628 TX(Session) - tx: Flow{NextIncomingID: 0, IncomingWindow: 100, NextOutgoingID: 0, OutgoingWindow: 100, Handle: 0, DeliveryCount: 0, LinkCredit: 1, Available: <nil>, Drain: false, Echo: false, Properties: map[]}
19:07:51.064771 RX(Session): Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: "\x00\x00\x00\x00", MessageFormat: 0, Settled: false, More: false, ReceiverSettleMode: <nil>, State: <nil>, Resume: false, Aborted: false, Batchable: false, Payload [size]: 1746}
19:07:51.064870 deliveryID 0 before push to receiver - deliveryCount : 0 - linkCredit: 1, len(messages): 0, len(inflight): 0
19:07:51.064908 deliveryID 0 after push to receiver - deliveryCount : 0 - linkCredit: 1, len(messages): 0, len(inflight): 0
19:07:51.064928 deliveryID 0 before exit - deliveryCount : 1 - linkCredit: 0, len(messages): 0
19:07:51.064933 PAUSE Link Mux pause: inflight: 0, credit: 0, deliveryCount: 1, messages: 0, unsettled: 1, maxCredit : 1, settleMode: first
19:07:51.065318 TX: Disposition{Role: Receiver, First: 0, Last: <nil>, Settled: true, State: Accepted, Batchable: false}
I don't see any way to resolve this situation starting with v0.13.6. Any idea how this can be resolved?
If I'm reading the previous commits correctly, the empty DeliveryTag is used as a hint for settleMode == ModeFirst
in addUnsettled
. Couldn't this instead just use the same if r.link.receiverSettleMode.value() == ModeSecond
as in HandleMessage
to be consistent?
Any help would be greatly appreciated.
While testing azservicebus it looks like if the link is closed calling 'accept()' will hang, rather than returning a more relevant error (ex: ErrLinkClosed).
This is partly investigation, as @jhendrixMSFT did have code to handle this case (code)
When running the example code included in the README.md, the receiver throws the following error approximately 1 min after receiving the last message from the broker queue:
2021/09/15 16:15:49 Reading message from AMQP:EOF
exit status 1
The issue occurs while executing the following line of the example:
for {
// Receive next message
msg, err := receiver.Receive(ctx)
if err != nil {
log.Fatal("Reading message from AMQP:", err)
}
Since it is executing in an infinite for-loop, it was expected to continue receiving messages until explicitly closed.
Using the HandleMessage method in place of Receive did not help. I have also tried changing the ConnIdleTimeout to both zero and non-zero values, and using a plain Background context without timeouts, but it did not have any effect.
I am using Solace PubSub+ event broker for the message queue. I have not used any SASL authentication.
Any help would be greatly appreciated.
How can we connect to active mq cluster using this library
Hello,
I am trying to use this package to connect to a QPID AMQP broker (it implements AMQP 1.0) and to send some messages. I am using the sample code that can be found on the godoc page, but I get the following error after connection:
2021/05/07 17:40:17 Creating sender link:*Error{Condition: amqp:invalid-field, Description: received Attach with remote null terminus., Info: map[]}
exit status 1
Here is my code:
package main
import (
"context"
"fmt"
"github.com/Azure/go-amqp"
"log"
"time"
)
func main() {
// Create client
client, err := amqp.Dial("amqp://sptest.aaaa.bb",
amqp.ConnSASLPlain("abababa", "cdcdcdc"),
)
fmt.Println("dial ok")
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
defer client.Close()
// Open a session
session, err := client.NewSession()
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
fmt.Println("session opened")
ctx := context.Background()
// Send a message
{
// Create a sender
sender, err := session.NewSender(
amqp.LinkTargetAddress("/myTestQueue"),
// amqp.LinkTargetTimeout(10),
// amqp.LinkTargetDurability(2),
)
if err != nil {
log.Fatal("Creating sender link:", err)
}
fmt.Println("Sender link created")
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
// Send message
err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")))
if err != nil {
log.Fatal("Sending message:", err)
}
sender.Close(ctx)
cancel()
}
}
I have tried many different ways to enter the queue name, with slash, without it, with TargetDurability, etc,. but no luck. Dialing is OK, session is opened, but then it exits with error.
From the same machine, using the same connection credentials, I can easily connect and send using node.js with rhea. This means that the problem must be in my code. I am sure I am missing some obvious thing, probably on how to reference the queue.
Any help is appreciated.
As part of #93 we added in some type aliases for some of the AMQP types. These are primarily for readability on our end to ensure that we've properly matched the type with the AMQP spec.
As these are cosmetic we haven't rigorously gone and applied them to all fields, but it'd be useful to do so.
There are three sections you can use to encode data into an AMQP message:
Sequence can actually be a sequence of several types of things, not just []bytes, and (like .Value) should be it's own separate field, with it's own distinct type.
My proposal is to create a .Sequence property and type it as []interface{}.
(The 'squashed' part is based on this comment in the code:)
// in Message
Data [][]byte
// A data section contains opaque binary data.
// TODO: this could be data(s), amqp-sequence(s), amqp-value rather than single data:
// "The body consists of one of the following three choices: one or more data
// sections, one or more amqp-sequence sections, or a single amqp-value section."
Currently have an issue with attach link throwing out a panic.
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x48 pc=0x846316]
goroutine 121 [running]:
[github.com/Azure/go-amqp.attachLink(0xc000546200](http://github.com/Azure/go-amqp.attachLink(0xc000546200), 0xc00044e9b0, {0xc00046e620, 0xc00046e530, 0x849a0a})
github.com/Azure/[email protected]/link.go:238 +0x6d6
[github.com/Azure/go-amqp.(*Session).NewReceiver(0xc38a64](http://github.com/Azure/go-amqp.(*Session).NewReceiver(0xc38a64), {0xc00046e620, 0x9, 0x9})
github.com/Azure/[email protected]/session.go:95 +0x6e
Line in question:
Line 238 in 01b4c64
l.Source
should not be the issue, as it gets initialized, in case of nil, so it has to be caused by resp.Source
.
Lines 225 to 227 in d1ea582
Furthermore, there is already a check for that returns an error when both the resp.Source
and resp.target
are nil, but it seems we are hitting an issue where only the resp.Source
is nil, as we don't get the detach error, but end up in this panic instead.
Line 193 in d1ea582
Also, when using dynamic address and assigning the respSource.Address
, there is an explicit check for whether the resp.Source
is nil.
Lines 229 to 231 in d1ea582
Should the l.Source.Filter = resp.Source.Filter
assignment have a similar check as the address assignment above, or should it result in an error?
We removed all the blocking and I/O in it so there's nothing to cancel.
sender, err := session.NewSender(
amqp.LinkSenderSettle(amqp.ModeUnsettled),
amqp.LinkReceiverSettle(amqp.ModeSecond),
amqp.LinkTargetAddress("helloworld"))
Presumably because fr.Done
isn't closed.
In addition, the messages aren't tracked in the sender's unsettled map.
Hi,
I'm facing an issue when I tried to create a new client using New(conn net.Conn, opts ...ConnOption) function and the host that I'm currently using is IP address. Here are some code with dummy values:
conn, err := net.Dial("tcp", "10.10.10.10:61616")
client, err := amqp.New(conn, amqp.ConnServerHostname("10.10.10.10"), amqp.ConnSASLPlain("username, "password"), amqp.ConnIdleTimeout(0))
if err != nil {
log.Fatal("Dialing AMQP server: ", err)
}
I got this in the response
"Dialing AMQP server: invalid frame body header"
Any advice on this?
Thank you
My code
type activeMQManager struct {
amqSender *amqp.Sender
}
func initActiveMq(ctx context.Context, logger *model.Logger) (*activeMQManager, error) {
// Try to connect with AMQ
client, err := amqp.Dial(logger.Conn)
if err != nil {
return nil, err
}
// Open a session
amqSession, err := client.NewSession()
if err != nil {
return nil, err
}
// Create a sender
sender, err := amqSession.NewSender(
amqp.LinkTargetAddress(logger.Topic),
)
if err != nil {
return nil, err
}
fmt.Println("Connected to active mq")
return &activeMQManager{amqSender: sender}, nil
}
// LogRequest logs headers,URL,body of the request to AMQ
func (m *activeMQManager) LogRequest(ctx context.Context, r *http.Request) error {
// Set a request id in header if it doesn't exist
requestID := r.Header.Get(model.RequestIDHeader)
if requestID == "" {
// set a new request id header of request
requestID = ksuid.New().String()
r.Header.Add(model.RequestIDHeader, requestID)
}
// copy request body
reqBody, _ := ioutil.ReadAll(r.Body)
r.Body = ioutil.NopCloser(bytes.NewBuffer(reqBody))
urlParams, _ := url.ParseQuery(r.URL.RawQuery)
payload := utils.M{
"type": "request",
"requestId": requestID,
"headers": r.Header,
"ts": time.Now().UTC(),
"url": r.URL.String(),
"urlPath": r.URL.Path,
"urlParams": urlParams,
"urlFragment": r.URL.Fragment,
"body": string(reqBody),
}
return m.sendAMQMessage(ctx, payload)
}
// LogResponse logs headers,status of the response to AMQ
func (m *activeMQManager) LogResponse(ctx context.Context, requestID string, status int, headers http.Header) error {
payload := utils.M{
"type": "response",
"ts": time.Now().UTC(),
"requestId": requestID,
"headers": headers,
"status": status,
}
return m.sendAMQMessage(ctx, payload)
}
func (m *activeMQManager) sendAMQMessage(ctx context.Context, payload utils.M) error {
data, err := json.Marshal(payload)
if err != nil {
return utils.NewError("Unable to marshal request payload", err)
}
// Send message
return m.amqSender.Send(ctx, amqp.NewMessage(data))
}
Client Version: v0.13.6
Seems like we are running into a flow control issue with the library. We have been able to reason about what we think (we currently can't see a way this could happen) might solve the issue. We have also been able to produce the problem with the attached sample code below, in under just couple hundred lines or so.
We create a queue that we are simultaneously, producing messages and consuming message within the application. Through this process we are expecting the consumer to receive them as they are available on the queue to process and marked them as accepted.
We setup a session and pass that to the Producer
and Consumer
which in turn create their respective Sender and Receiver methods on that same session.
On the Consumer
we set LinkCredit to 27
more on that later. We are running 8
consumers in this example. As you can see below.
What we are seeing it that the Consumers
only manages to dequeue first 100 messages or so before the AMQP server stops providing any more messages for retrieval and we are stuck. Bear in mind we are able to continue sending on that same session.
We enabled debugging, and in the attached logs.txt
you will see that
OutgoingWindow
reporting is kind of weird and sometimes returns max values in the logs. Is this a known issue? Can you shed some light on this? I spent quite some time trying to track the back and forth to illuminate what is happening on the wire.
...
11:23:26.098572 TX: Begin{RemoteChannel: <nil>, NextOutgoingID: 0, IncomingWindow: 100, OutgoingWindow: 100, HandleMax: 4294967295, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
11:23:26.098954 RX: Begin{RemoteChannel: 0, NextOutgoingID: 1, IncomingWindow: 16383, OutgoingWindow: 2147483647, HandleMax: 65535, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
11:23:26.099082 TX: Attach{Name: nSbobxLLsp2GbySYOLWUFYDnf17WzICmn3gc3r5pmf9Pfuq46fbv7g, Handle: 0, Role: Sender, SenderSettleMode: <nil>, ReceiverSettleMode: <nil>, Source: <nil>, Target: source{Address: /queue-name, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
....
Now back to the numbers, this is a working theory we have that the SessionIncomingWindow
which default 100
is used to control flow at the session level and the LinkCredits
are being used at the link level. This is the part we are unsure about the relationship between the SessionIncomingWindow
and LinkCredits
.
So in the example case here we have 8 consumers with 27
link credits each which is 216
. Which is little over the double of the SessionIncomingWindow
which defaults to 100
Since they are all sharing the same session, we assume this is what is tripping the control flow and causing the error we are seeing.
If we lower just the link credits per consumer down to 24
which results to 192
we never see the lock up and all the messages a processed smoothly. We were able to verify this with various combinations as well.
We are looking better to understand this relationship since it doesn't make sense currently. We are aware that these limits are being calculated on the fly based on other values until server and client can sync them. Could there be a race condition which ends up locking up the processing?
I hope this is adequate information to help us debug the issue we are facing. Let me know if you need anymore information.
Line 395 in 6a20a07
When starting a link, the receiver sends a flow message expecting next next-deliver-id == 0,
but the send code starts with deliveryID = 1.
This causes the receive client to not acknowledge the second msg that is sent.
As part of the ReceiveMessages() implementation for Service Bus (batched receiving) there's a workflow similar to this:
While (3) is happening we can still be receiving messages. Drain will ask the service to basically flush whatever it's got on hand (which can also be nothing).
To get a clean boundary between one receiver to the next I need to make sure that I can properly drain the local receiver of any messages it might still have in l.messages
. This is almost possible today because there's already a non-blocking read of the channel prior to the later blocking read of the channel link but I don't think I have a way of guaranteeing that I only read from the cached messages (ie - I'm not interested in blocking at all).
Hi! I might have overlooked something, but I'm unable to make new queues/messages persistent. I'm coming from Java where I've been able to create new queues/messages that default to persistent. I've found the 'durability configuration' link option on create sender/receiver, but that does not seem to make messages persistent.
How do I mark my messages as persistent?
Thanks in advance!
Hello,
I am connecting in modefirst with a linkcredit of 100 as follows:
err := q.receiver.HandleMessage(ctx, q.onPublish)
if err != nil {
q.log.Error(err, "Error with queue")
}
func (q *Queue) onPublish(msg *amqp.Message) error {
message := new(core.Message)
if err := json.Unmarshal(msg.GetData(), message); err != nil {
q.log.Error(err, "could not get data")
msg.Reject(context.Background(), &amqp.Error{
Condition: "",
Description: err.Error(),
Info: nil,
})
return err
}
q.messages <- *message
err := msg.Accept(context.Background())
if err != nil {
return err
}
return nil
}
After hitting the credit of 100 messages, my service does not receive messages anymore although the broker says, that 100 messages were acknowledget by the subscriber.
I assume that go-amqp with the new HandleMessage
doesn't differ that much from the introduction example, but what am I doing wrong ?
Best regards
Looking at the AMQP 1.0 spec, section 2.4.5 Idle Timeout of a Connection, it has the following text:
To avoid spurious timeouts, the value in idle-time-out SHOULD be half the peer’s
actual timeout threshold.
However this library seems to implement the opposite -- it considers the value of the idle timeout as a hard limit and heartbeats at 2x the rate. See
https://github.com/Azure/go-amqp/blob/master/conn.go#L465
https://github.com/Azure/go-amqp/blob/master/conn.go#L579
I'm seeing IBM MQ 9.2.0's AMQP implementation heartbeat at precisely the given idle timeout period, which can cause spurious disconnections. (It also has a AMQPKA setting whose value is halved before being provided in the server's idle timeout value.)
I'm also a newbie as far as AMQP 1.0 is concerned, so perhaps this library is right and IBM's implementation is wrong.
==================
WARNING: DATA RACE
Write at 0x00c000002458 by goroutine 20:
github.com/Azure/go-amqp.(*link).muxReceive()
C:/git/Azure/go-amqp/link.go:542 +0x1334
github.com/Azure/go-amqp.(*link).muxHandleFrame()
C:/git/Azure/go-amqp/link.go:567 +0x384
github.com/Azure/go-amqp.(*link).mux()
C:/git/Azure/go-amqp/link.go:321 +0xab6
github.com/Azure/go-amqp.attachLink┬╖dwrap┬╖12()
C:/git/Azure/go-amqp/link.go:237 +0x39
Previous read at 0x00c000002458 by goroutine 16:
github.com/Azure/go-amqp.(*Session).mux()
C:/git/Azure/go-amqp/session.go:362 +0xd84
github.com/Azure/go-amqp.(*Client).NewSession┬╖dwrap┬╖1()
C:/git/Azure/go-amqp/client.go:155 +0x47
Goroutine 20 (running) created at:
github.com/Azure/go-amqp.attachLink()
C:/git/Azure/go-amqp/link.go:237 +0x174c
github.com/Azure/go-amqp.(*Session).NewReceiver()
C:/git/Azure/go-amqp/session.go:90 +0x11e
github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2()
C:/git/Azure/go-amqp/integration_test.go:371 +0x164
Goroutine 16 (running) created at:
github.com/Azure/go-amqp.(*Client).NewSession()
C:/git/Azure/go-amqp/client.go:155 +0x6f7
github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1()
C:/git/Azure/go-amqp/integration_test.go:330 +0x1f5
testing.tRunner()
C:/Program Files/Go/src/testing/testing.go:1259 +0x22f
testing.(*T).Run┬╖dwrap┬╖21()
C:/Program Files/Go/src/testing/testing.go:1306 +0x47
==================
==================
WARNING: DATA RACE
Write at 0x00c0000560f0 by goroutine 19:
github.com/Azure/go-amqp.(*inFlight).add()
C:/git/Azure/go-amqp/receiver.go:296 +0x135
github.com/Azure/go-amqp.(*Receiver).messageDisposition()
C:/git/Azure/go-amqp/receiver.go:259 +0x124
github.com/Azure/go-amqp.(*Message).Accept()
C:/git/Azure/go-amqp/types.go:1815 +0x137
github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2.1()
C:/git/Azure/go-amqp/integration_test.go:385 +0x8b
github.com/Azure/go-amqp.(*Receiver).HandleMessage.func2()
C:/git/Azure/go-amqp/receiver.go:57 +0x1b7
github.com/Azure/go-amqp.(*Receiver).HandleMessage()
C:/git/Azure/go-amqp/receiver.go:75 +0x5ad
github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2()
C:/git/Azure/go-amqp/integration_test.go:383 +0x3d2
Previous read at 0x00c0000560f0 by goroutine 20:
github.com/Azure/go-amqp.(*link).mux()
C:/git/Azure/go-amqp/link.go:314 +0x224
github.com/Azure/go-amqp.attachLink┬╖dwrap┬╖12()
C:/git/Azure/go-amqp/link.go:237 +0x39
Goroutine 19 (running) created at:
github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1()
C:/git/Azure/go-amqp/integration_test.go:367 +0xa64
testing.tRunner()
C:/Program Files/Go/src/testing/testing.go:1259 +0x22f
testing.(*T).Run┬╖dwrap┬╖21()
C:/Program Files/Go/src/testing/testing.go:1306 +0x47
Goroutine 20 (running) created at:
github.com/Azure/go-amqp.attachLink()
C:/git/Azure/go-amqp/link.go:237 +0x174c
github.com/Azure/go-amqp.(*Session).NewReceiver()
C:/git/Azure/go-amqp/session.go:90 +0x11e
github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2()
C:/git/Azure/go-amqp/integration_test.go:371 +0x164
==================
==================
WARNING: DATA RACE
Read at 0x00c0001173f0 by goroutine 19:
github.com/Azure/go-amqp.(*Message).done()
C:/git/Azure/go-amqp/types.go:1786 +0x3c
github.com/Azure/go-amqp.(*Message).Accept┬╖dwrap┬╖20()
C:/git/Azure/go-amqp/types.go:1814 +0x39
github.com/Azure/go-amqp.(*Message).Accept()
C:/git/Azure/go-amqp/types.go:1815 +0x148
github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2.1()
C:/git/Azure/go-amqp/integration_test.go:385 +0x8b
github.com/Azure/go-amqp.(*Receiver).HandleMessage.func2()
C:/git/Azure/go-amqp/receiver.go:57 +0x1b7
github.com/Azure/go-amqp.(*Receiver).HandleMessage()
C:/git/Azure/go-amqp/receiver.go:75 +0x5ad
github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2()
C:/git/Azure/go-amqp/integration_test.go:383 +0x3d2
Previous write at 0x00c0001173f0 by goroutine 21:
github.com/Azure/go-amqp.(*Receiver).HandleMessage.func1()
C:/git/Azure/go-amqp/receiver.go:35 +0x73
github.com/Azure/go-amqp.(*Receiver).HandleMessage.func2┬╖dwrap┬╖14()
C:/git/Azure/go-amqp/receiver.go:54 +0x47
Goroutine 19 (running) created at:
github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1()
C:/git/Azure/go-amqp/integration_test.go:367 +0xa64
testing.tRunner()
C:/Program Files/Go/src/testing/testing.go:1259 +0x22f
testing.(*T).Run┬╖dwrap┬╖21()
C:/Program Files/Go/src/testing/testing.go:1306 +0x47
Goroutine 21 (running) created at:
github.com/Azure/go-amqp.(*Receiver).HandleMessage.func2()
C:/git/Azure/go-amqp/receiver.go:54 +0x1a5
github.com/Azure/go-amqp.(*Receiver).HandleMessage()
C:/git/Azure/go-amqp/receiver.go:75 +0x5ad
github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2()
C:/git/Azure/go-amqp/integration_test.go:383 +0x3d2
==================
==================
WARNING: DATA RACE
Write at 0x00c00013b650 by goroutine 19:
runtime.mapassign_fast32()
C:/Program Files/Go/src/runtime/map_fast32.go:92 +0x0
github.com/Azure/go-amqp.(*inFlight).add()
C:/git/Azure/go-amqp/receiver.go:298 +0x93
github.com/Azure/go-amqp.(*Receiver).messageDisposition()
C:/git/Azure/go-amqp/receiver.go:259 +0x124
github.com/Azure/go-amqp.(*Message).Accept()
C:/git/Azure/go-amqp/types.go:1815 +0x137
github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2.1()
C:/git/Azure/go-amqp/integration_test.go:385 +0x8b
github.com/Azure/go-amqp.(*Receiver).HandleMessage.func2()
C:/git/Azure/go-amqp/receiver.go:57 +0x1b7
github.com/Azure/go-amqp.(*Receiver).HandleMessage()
C:/git/Azure/go-amqp/receiver.go:75 +0x5ad
github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2()
C:/git/Azure/go-amqp/integration_test.go:383 +0x3d2
Previous read at 0x00c00013b650 by goroutine 20:
github.com/Azure/go-amqp.(*link).mux()
C:/git/Azure/go-amqp/link.go:314 +0x244
github.com/Azure/go-amqp.attachLink┬╖dwrap┬╖12()
C:/git/Azure/go-amqp/link.go:237 +0x39
Goroutine 19 (running) created at:
github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1()
C:/git/Azure/go-amqp/integration_test.go:367 +0xa64
testing.tRunner()
C:/Program Files/Go/src/testing/testing.go:1259 +0x22f
testing.(*T).Run┬╖dwrap┬╖21()
C:/Program Files/Go/src/testing/testing.go:1306 +0x47
Goroutine 20 (running) created at:
github.com/Azure/go-amqp.attachLink()
C:/git/Azure/go-amqp/link.go:237 +0x174c
github.com/Azure/go-amqp.(*Session).NewReceiver()
C:/git/Azure/go-amqp/session.go:90 +0x11e
github.com/Azure/go-amqp_test.TestIntegrationReceiverModeSecond.func1.2()
C:/git/Azure/go-amqp/integration_test.go:371 +0x164
==================
At present we have a mix of sentinel and error types making for some clunky error handling.
We are experiencing weird behaviour when messages take longer than half the lock-duration of the servicebus queue or topic.
Example:
Queue with a lock-duration of 30 seconds
All messages taking 20 seconds before Accept.
Expected behaviour:
All messages to be read and accepted by the servicebus
Actual behaviour:
1st message gets accepted correctly by the queue
all subsequent messaged are reported to the client as accepted, but the queue handles them like "Release()"
i.e no errors are reported from the "msg.Accept()"-call
The timing is always half of the "lock-duration", i.e setting lock-duration to max 5 minutes, messages taking >2.5 minutes behave the same as the scenario above.
and all messages taking less than half the lock-duration works as they should.
after some extended testing it seems that some message occasionally gets accepted correctly, but most messages end up as "released"
Calling AcceptMessage() on the same message twice in a row hangs on the second call.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.