streadway / amqp Goto Github PK
View Code? Open in Web Editor NEWGo client for AMQP 0.9.1
Home Page: http://godoc.org/github.com/streadway/amqp
License: BSD 2-Clause "Simplified" License
Go client for AMQP 0.9.1
Home Page: http://godoc.org/github.com/streadway/amqp
License: BSD 2-Clause "Simplified" License
Question... I am looking over AMQP, is there an example or function that just gets a list of the queues from the server? Something equivalent to:
curl -i -u guest:guest http://localhost:55672/api/queues
I see functions on how to manipulate the queues, but you have to know what they are.
Experimenting with the amqp lib and it's worked fine until I added at x-message-ttl argument for one of my queues.
args = make(amqp.Table)
args["x-dead-letter-exchange"] = "someex"
args["x-message-ttl"] = uint32(900000)
deadQueue, err = channel.QueueDeclare("someq", true, false, false, false, args)
if err != nil {
return
}
Gives me the error:
Exception (501) Reason: "WSARecv tcp 10.0.0.129:35740: The specified network name is no longer available."
Also tried passing the x-message-ttl as a string, but that gives a PRECONDITION_FAILED, ie:
Exception (406) Reason: "PRECONDITION_FAILED - inequivalent arg 'x-message-ttl'for queue 'soamon.requests-dead' in vhost 'soamon': received the value '900000' of type 'longstr' but current is none"
Is this a bug ... or a problem between the keyboard and the monitor? :)
The following code does a publish that throws a channel exception and immediately closes the channel, thus sending "Basic.Publish" and "Channel.Close". The server now answers with "Channel.Close" itself to notify the client of the channel exception and with "Channel.Close-Ok" to answer to "Channel.Close" (recorded with Wireshark). This case is not handled properly by the library, because it disposes the channel because of the server's "Channel.Close" and then can't find the channel any more when handling "Channel.Close-Ok". This in turn trashes the connection, thus the publish at the end fails, too. Even more, there is a delay until that happens (until heartbeat).
package main
import (
"fmt"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost")
if err != nil {
panic(err)
}
go func() {
for err := range conn.NotifyClose(make(chan *amqp.Error)) {
fmt.Println("AMQP connection: " + err.Error())
}
}()
channel1, err := conn.Channel()
if err != nil {
panic(err)
}
go func() {
for err := range channel1.NotifyClose(make(chan *amqp.Error)) {
fmt.Println("AMQP channel1: " + err.Error())
}
}()
channel2, err := conn.Channel()
if err != nil {
panic(err)
}
go func() {
for err := range channel2.NotifyClose(make(chan *amqp.Error)) {
fmt.Println("AMQP channel2: " + err.Error())
}
}()
err = channel1.Publish("nonexisting-exchange", "", false, false, amqp.Publishing{})
if err != nil {
panic(err)
}
channel1.Close()
err = channel2.ExchangeDeclare("test", "topic", false, true, false, false, nil)
if err != nil {
panic(err)
}
}
Nack including tests are missing.
In order to negotiate heartbeat/auth/framing/channel sizes from the client while keeping the channel constructor simple, wrap up the state in a Config struct that will be copied into the resultant connection after tuning and used to discover body fragment size, heartbeat intervals and read/write deadlines.
I haven't tracked down the reason for this, but I'll send thru a PR if I can figure it out.
Basically, I can't bind to an anonymous queue with the binding key "". It seems to work OK if I give the queue a name. There must be some sort of default being assigned, wrongly assuming that I didn't want the empty binding key, because the binding key is coming out as the name of the anonymous queue.
In this screenshot, both queues were bound by this library, using an empty binding key: http://d.pr/i/Zs9G
To reproduce, just run the simple consumer with these flags:
fails:
-queue="" -key="" -lifetime=300s
works OK:
-queue=ohai -key="" -lifetime=300s
While I run gen.sh, it fails to generate the spec go file;
$ ./gen.sh
2013/05/22 09:47:50 Could not parse XML: xml: response>name chain not valid with attr flag
Consume/Cancel should be coherent. Canceling from a Delivery doesn't make sense in all cases.
The fields for an AMQP URL are well defined so remove the distinction of 'Parsed' and add methods similar to net/url.URL.
Flow control and tests are missing.
Even though that RabbitMQ uses TCP pushback for connection wide flow control, at least implement handling of flow control messages for the sake of completeness.
Insert the following sleep commands to trigger the race detection every time:
diff --git a/connection.go b/connection.go
index 38f4421..1cfa550 100644
--- a/connection.go
+++ b/connection.go
@@ -355,6 +355,7 @@ func (me *Connection) reader() {
// Reset the blocking read deadline on the underlying connection when it
// implements SetReadDeadline to three times the requested heartbeat interval.
// On error, resort to blocking reads.
+ time.Sleep(time.Second)
if me.Config.Heartbeat > 0 {
if c, ok := me.conn.(readDeadliner); ok {
c.SetReadDeadline(time.Now().Add(3 * me.Config.Heartbeat))
@@ -522,6 +523,7 @@ func (me *Connection) openTune(config Config, auth Authentication) error {
me.Config.Heartbeat = time.Second * time.Duration(pick(
int(config.Heartbeat/time.Second),
int(tune.Heartbeat)))
+ time.Sleep(2 * time.Second)
// "The client should start sending heartbeats after receiving a
// Connection.Tune method"
Then
go run -race examples/simple-producer/producer.go
will give:
WARNING: DATA RACE
Read by goroutine 6:
github.com/streadway/amqp.(*Connection).reader()
/.../src/github.com/streadway/amqp/connection.go:359 +0x256
gosched0()
/usr/local/Cellar/go/1.1/src/pkg/runtime/proc.c:1218 +0x9f
Previous write by goroutine 1:
github.com/streadway/amqp.(*Connection).openTune()
/.../src/github.com/streadway/amqp/connection.go:525 +0x4e4
github.com/streadway/amqp.(*Connection).openStart()
/.../src/github.com/streadway/amqp/connection.go:495 +0x4a1
github.com/streadway/amqp.(*Connection).open()
/.../src/github.com/streadway/amqp/connection.go:471 +0xc0
github.com/streadway/amqp.Open()
/.../src/github.com/streadway/amqp/connection.go:154 +0x4e8
github.com/streadway/amqp.DialTLS()
/.../src/github.com/streadway/amqp/connection.go:135 +0xe94
github.com/streadway/amqp.Dial()
/.../src/github.com/streadway/amqp/connection.go:77 +0x45
main.publish()
/.../src/github.com/streadway/amqp/examples/simple-producer/producer.go:40 +0x142
main.main()
/.../src/github.com/streadway/amqp/examples/simple-producer/producer.go:27 +0x174
runtime.main()
/usr/local/Cellar/go/1.1/src/pkg/runtime/proc.c:182 +0x91
Goroutine 6 (running) created at:
github.com/streadway/amqp.Open()
/.../src/github.com/streadway/amqp/connection.go:153 +0x4bf
github.com/streadway/amqp.DialTLS()
/.../src/github.com/streadway/amqp/connection.go:135 +0xe94
github.com/streadway/amqp.Dial()
/.../src/github.com/streadway/amqp/connection.go:77 +0x45
main.publish()
/.../src/github.com/streadway/amqp/examples/simple-producer/producer.go:40 +0x142
main.main()
/.../src/github.com/streadway/amqp/examples/simple-producer/producer.go:27 +0x174
runtime.main()
/usr/local/Cellar/go/1.1/src/pkg/runtime/proc.c:182 +0x91
Goroutine 1 (running) created at:
_rt0_amd64()
/usr/local/Cellar/go/1.1/src/pkg/runtime/asm_amd64.s:87 +0x106
RabbitMQ 2.7.1 on Mac OS X 10.7 Lion. streadway/amqp "rpc" branch via remote-tracking-repo here. Calling amqp.NewConnection yields this output (after a couple seconds):
err in ReadFrame: <nil> EOF
connection shutdown 1
2012/05/24 13:05:08 Connect: Unexpected protocol message
Complete program illustrating the issue available here.
Fix these build errors. Only reproducible on Travis at the moment:
https://travis-ci.org/streadway/amqp/builds/6130148
Getting a NPE inside a Mutex?
RabbitMQ 2.7.1, OS X 10.7 Lion.
Using this code (snippet only), I get this output. The exchange.Declare appears to be blocking indefinitely. I ran a tcpdump and got this dump. Check the error at 12:04:40.907773. I guess it isn't getting bubbled up?
The docs are wordy and often not clear. Reduce the noise and highlight the gotchas of AMQP mostly about order of operations.
I set up a channel, declare an exchange and queue, and launch a goroutine to handle deliveries. Some time later, I attempt to close the channel. The Close() call blocks indefinitely at this channel receive statement. This self-contained example demonstrates the issue. Maybe I am setting something up incorrectly?
Review the responsibilities of each actor on a connection, and (hopefully) refactor to remove ambiguities. This also involves making connection state goroutine-safe.
I've not explored this amqp library in details but the idea is to have RabbitMQ's extension -- publisher confirms over a channel. Also, implement transactions if not done already.
I started by writing my library but found this one, so it makes sense to learn the work that has been done in it till now and contribute so that it can be used in production env.
Sean, thanks for your library. Can you please list features which have been missing so we know on the roadmap what needs to be done or needs to be fixed? My requirement specifically is to have a golang library which supports some way of transactions and provides guarantee of communication over full of latency geo distributed brokers and consumers.
The following code is crashing the connection reproducibly. It only happens if GOMAXPROCS is set to greater than 1.
package main
import (
"fmt"
"github.com/streadway/amqp"
"runtime"
)
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
conn, err := amqp.Dial("amqp://guest:guest@localhost")
if err != nil {
panic(err)
}
for {
channel, err := conn.Channel()
if err != nil {
fmt.Println("Connection:", err)
return
}
for i := 0; i < 100; i++ {
err := channel.Publish("not-existing-exchange", "some-key", false, false, amqp.Publishing{Body: []byte("some-data")})
if err != nil {
fmt.Println("Channel:", err)
break
}
}
}
}
Publisher confirms and the means for the application to handle retries are missing. Design an API that makes sense and possibly acts similarly to Consume
.
Discussion welcome.
I've got one goroutine (A) reading from the deliveries chan returned from queue.Consume. It's meant to forward the delivery.Body along another chan, to another goroutine (B), which batch-processes (pipelines) a bunch of them at once. I was doing a bodiesChan <- d.Body
within (A), but I was seeing a ton of corrupted bodies in (B) when I tried to json.Unmarshal
them.
I thought it might be a memory model issue, that you might be re-using Delivery instances, or some other property which would render them somehow unsafe. So, as an experiment, I tried copying the Body to a fresh byte-slice before forwarding it on, which dramatically reduced but did not eliminate the errors. Also, the errors only occur reliably when the throughput is above some threshold. Which all leads me to believe there is something more subtle happening.
I feel like I'm using the API correctly. Am I missing something?
When we control the requested heartbeat interval, we can reliably set a read deadline on the created connection inside of amqp.Dial
.
When publishing with the immediate bit set to true we get an error from the server:
=ERROR REPORT==== 30-Dec-2012::17:56:25 ===
AMQP connection <0.1202.0> (running), channel 1 - error:
{amqp_error,not_implemented,"immediate=true",'basic.publish'}
I am not sure how we should handle these version incompatibilities at the client library level.
I tried to use your qos method, because I need to use more than one Consumer and a RoundRobin delivery. The method comments describe exactly this functionality, but I get a not_implemented hint when setting prefetchCount to more than 0 -- So the method is there but not yet implemented? I would like to help, if I can.....
The appropriate structs and fields are exported.
Documentation is built properly.
Appropriate interfaces are used like io.WriteTo... how to handle Read/ReadFrom?
Bitstream manipulation is intuitive.
Anything else that could make this code simpler.
After getting notified on Close, the reconnects can be separate from publish/consume. Show how it can be done.
To support external reconnection handlers - expose a channel on the connection and channel types that will be readable when the connection or channel is closed.
A reconnect goroutine from your application would do something like:
go func() {
for {
<-conn.Closed:
conn = reconnect()
}
}()
I'd love opinions on how/what to send over the Closed channel.
I can see the headers coming over the wire, but they get lost somewhere along the line inside the driver.
Steps to reproduce:
handler
function in the simple consumer to also log the headers-lifetime=300s
test-queue
queue from the management plugin.Show how to have reliable publishing by handling acks/nacks in a Confirm listener.
Include the sequence increment and channel map updates to remove any possible races between those states.
There are 2 styles of calls to the server, message and RPC. Model the internal API such that multiple listeners can handle each kind of call allowing a select on a timeout and return value channels and eventually handle client ACKs.
This bug was really complicated to track down: The thing is, that the receiving goroutine of an amqp channel is blocked if the goroutine on the other end of the deliveries channel is not reading from the channel. In my case, this was because the session had ended and still messages arrived on the queue. When then calling Channel.Close, it sends a close message and waits for the confirmation, but this confirmation will never arrive because the receiving goroutine is still blocked.
I see two solutions:
To capture the semantics of hard and soft errors coming down for connection and channel closures and differentiate them from protocol framing or network errors, create an error type that makes sense to be returned from synchronous API calls.
Synchronous basic.get
and tests are missing.
ackOrNack := make(chan uint64, 1)
channel.NotifyConfirm(ackOrNack, ackOrNack)
channel.Close()
Will panic with already closing a closed channel.
At the moment, the library utilizes structs everywhere. This means that using amqp makes mocking for tests quite difficult since a real AMQP broker is required. In our code I created an interface that the Channel/Connection meets, so that I could use my own mocks, but this was a bit more work than necessary (if the amqp lib itself used interfaces).
Ideally the Channel, and Connection should be interfaces so that they can be mocked, from my look at the code only a few other methods not currently exposed as the public API need an interface definition (the Config and raw frame Send). This way mocking the channel/connection can be done so that a real AMQP server isn't necessary, it'd also make it easier for the library itself to have a more thorough set of tests.
ISTM that the integration tests are broken. I discovered this while I was composing test coverage for #67. While go test
(and TravisCI) seem to PASS
, upon closer inspection, for reasons unknown to me at the moment, the only tests being run are the ones in shared_test.go
. I did not track down the breaking commit via git bisect
, but maybe I will if I have a chance. I know that the tests were running as recently as a few weeks ago.
(BTW, I am still using go1.0.3 because we haven't upgraded yet)
Steps to reproduce:
integration_test.go
using a freshly-cloned copy of this repo.go test
and watch all the tests pass. If you run go test-v
you'll only see the output from the shared_test.go
.Set up a build and test in Travis CI.
Transaction messages and tests are missing.
Now that things are settling down, extracting the motivation, general architecture/usage and license into the README will be needed.
It looks like the connection parameter tuning is done incorrectly. The client is supposed to tell the server what the settings are, taking into account the settings the server told it. By my reading of the code, what is happening instead is that the client records what the server told it and uses it in subsequent interactions (e.g. for fragmenting bodies based on the max frame size) but tells the server some completely different, fixed settings.
AFAIK the code fragments bodies at frame_max. However, in AMQP, the negotiated frame_max limits the total frame size, including header and end marker, which add up to 8 octets in total. Hence body fragmentation should occur in chunks of frame_max-8.
TestIntegrationReturn
is failing with RabbitMQ 3.0.0 on my local machine.
However, it passes in isolation by running: AMQP_URL="amqp://guest:guest@localhost:5672/" go test -test.run TestIntegrationRun
I believe this is the indication of a bug I was tracking down which made my channel block on exchange declaration under certain conditions.
Instead of checking for the AMQP_URL environment, use build flags like:
// +build integration
Many of the structs and chans use pass by value instead of pass by reference/pointer. Would be great if someone better than me at the Go memory model can review the potential impact of pass by value and suggest where pass by reference would make more sense.
Shouldn't the parameter for this function be a directional channel?
c chan<- Return
I'm trying to make this package work with a qpidd server, however when I try and run it the Dial() funciton is returning an error of:
Exception (501) Reason: "Exception (501) Reason: \"frame could not be parsed""
Any help would be appreciated.
This will deadlock:
conn := Open(...)
conn.Close()
<-conn.NotifyClose(make(chan *Error))
Hi,
can the library ignore calls to Connection.Close if the connection is already closed? Because currently it crashes in one of amqp's goroutines and that makes the system quite unstable. Because I have had the case that an error caused the connection to close and then my "defer conn.Close()" kicked in and crashed the whole thing, not allowing it to just recover the first error and try again to open a new connection.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.