Git Product home page Git Product logo

amqp's Introduction

Build Status GoDoc

Go RabbitMQ Client Library (Unmaintained Fork)

Beware of Abandonware

This repository is NOT ACTIVELY MAINTAINED. Consider using a different fork instead: rabbitmq/amqp091-go. In case of questions, start a discussion in that repo or use other RabbitMQ community resources.

Project Maturity

This project has been used in production systems for many years. As of 2022, this repository is NOT ACTIVELY MAINTAINED.

This repository is very strict about any potential public API changes. You may want to consider rabbitmq/amqp091-go which is more willing to adapt the API.

Supported Go Versions

This library supports two most recent Go release series, currently 1.10 and 1.11.

Supported RabbitMQ Versions

This project supports RabbitMQ versions starting with 2.0 but primarily tested against reasonably recent 3.x releases. Some features and behaviours may be server version-specific.

Goals

Provide a functional interface that closely represents the AMQP 0.9.1 model targeted to RabbitMQ as a server. This includes the minimum necessary to interact the semantics of the protocol.

Non-goals

Things not intended to be supported.

  • Auto reconnect and re-synchronization of client and server topologies.
    • Reconnection would require understanding the error paths when the topology cannot be declared on reconnect. This would require a new set of types and code paths that are best suited at the call-site of this package. AMQP has a dynamic topology that needs all peers to agree. If this doesn't happen, the behavior is undefined. Instead of producing a possible interface with undefined behavior, this package is designed to be simple for the caller to implement the necessary connection-time topology declaration so that reconnection is trivial and encapsulated in the caller's application code.
  • AMQP Protocol negotiation for forward or backward compatibility.
    • 0.9.1 is stable and widely deployed. Versions 0.10 and 1.0 are divergent specifications that change the semantics and wire format of the protocol. We will accept patches for other protocol support but have no plans for implementation ourselves.
  • Anything other than PLAIN and EXTERNAL authentication mechanisms.
    • Keeping the mechanisms interface modular makes it possible to extend outside of this package. If other mechanisms prove to be popular, then we would accept patches to include them in this package.

Usage

See the 'examples' subdirectory for simple producers and consumers executables. If you have a use-case in mind which isn't well-represented by the examples, please file an issue.

Documentation

Use Godoc documentation for reference and usage.

RabbitMQ tutorials in Go are also available.

Contributing

Pull requests are very much welcomed. Create your pull request on a non-master branch, make sure a test or example is included that covers your change and your commits represent coherent changes that include a reason for the change.

To run the integration tests, make sure you have RabbitMQ running on any host, export the environment variable AMQP_URL=amqp://host/ and run go test -tags integration. TravisCI will also run the integration tests.

Thanks to the community of contributors.

External packages

License

BSD 2 clause - see LICENSE for more details.

amqp's People

Contributors

0x6e6562 avatar abrander avatar agrimprasad avatar akrennmair avatar bombsimon avatar deadok22 avatar digitalcrab avatar dimm0 avatar domodwyer avatar elmacnifico avatar gerhard avatar hagna avatar huehnerhose avatar humanchimp avatar jleroy avatar kinbiko avatar kpurdon avatar lukebakken avatar makasim avatar michaelklishin avatar mogigoma avatar mordyovits avatar peterbourgon avatar philhug avatar pierrre avatar pnordahl avatar raqbit avatar sudorandom avatar umapathy-cisco avatar zenovich avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

amqp's Issues

Implement Confirm

Publisher confirms and the means for the application to handle retries are missing. Design an API that makes sense and possibly acts similarly to Consume.

Discussion welcome.

Example: Reconnect

After getting notified on Close, the reconnects can be separate from publish/consume. Show how it can be done.

Enable mechanisms for confirmed delivery: publisher confirms and transactions

I've not explored this amqp library in details but the idea is to have RabbitMQ's extension -- publisher confirms over a channel. Also, implement transactions if not done already.
I started by writing my library but found this one, so it makes sense to learn the work that has been done in it till now and contribute so that it can be used in production env.

Sean, thanks for your library. Can you please list features which have been missing so we know on the roadmap what needs to be done or needs to be fixed? My requirement specifically is to have a golang library which supports some way of transactions and provides guarantee of communication over full of latency geo distributed brokers and consumers.

connection parameter tuning broken

It looks like the connection parameter tuning is done incorrectly. The client is supposed to tell the server what the settings are, taking into account the settings the server told it. By my reading of the code, what is happening instead is that the client records what the server told it and uses it in subsequent interactions (e.g. for fragmenting bodies based on the max frame size) but tells the server some completely different, fixed settings.

Dial() returns an error with qpidd-cpp 0.14

I'm trying to make this package work with a qpidd server, however when I try and run it the Dial() funciton is returning an error of:

Exception (501) Reason: "Exception (501) Reason:  \"frame could not be parsed""

Any help would be appreciated.

Immediate publish deprecated in RabbitMQ 3

When publishing with the immediate bit set to true we get an error from the server:

=ERROR REPORT==== 30-Dec-2012::17:56:25 ===
AMQP connection <0.1202.0> (running), channel 1 - error:
{amqp_error,not_implemented,"immediate=true",'basic.publish'}

I am not sure how we should handle these version incompatibilities at the client library level.

Use interfaces for Channel/Connection for mock testing capability

At the moment, the library utilizes structs everywhere. This means that using amqp makes mocking for tests quite difficult since a real AMQP broker is required. In our code I created an interface that the Channel/Connection meets, so that I could use my own mocks, but this was a bit more work than necessary (if the amqp lib itself used interfaces).

Ideally the Channel, and Connection should be interfaces so that they can be mocked, from my look at the code only a few other methods not currently exposed as the public API need an interface definition (the Config and raw frame Send). This way mocking the channel/connection can be done so that a real AMQP server isn't necessary, it'd also make it easier for the library itself to have a more thorough set of tests.

Consolidate error interface

To capture the semantics of hard and soft errors coming down for connection and channel closures and differentiate them from protocol framing or network errors, create an error type that makes sense to be returned from synchronous API calls.

body fragmentation off by 8

AFAIK the code fragments bodies at frame_max. However, in AMQP, the negotiated frame_max limits the total frame size, including header and end marker, which add up to 8 octets in total. Hence body fragmentation should occur in chunks of frame_max-8.

Qos prefetchCount functionality

I tried to use your qos method, because I need to use more than one Consumer and a RoundRobin delivery. The method comments describe exactly this functionality, but I get a not_implemented hint when setting prefetchCount to more than 0 -- So the method is there but not yet implemented? I would like to help, if I can.....

Can't use an empty binding key with an anonymous queue

I haven't tracked down the reason for this, but I'll send thru a PR if I can figure it out.

Basically, I can't bind to an anonymous queue with the binding key "". It seems to work OK if I give the queue a name. There must be some sort of default being assigned, wrongly assuming that I didn't want the empty binding key, because the binding key is coming out as the name of the anonymous queue.

In this screenshot, both queues were bound by this library, using an empty binding key: http://d.pr/i/Zs9G

To reproduce, just run the simple consumer with these flags:

fails:
-queue="" -key="" -lifetime=300s

works OK:
-queue=ohai -key="" -lifetime=300s

Delivery.Body corruption under load

I've got one goroutine (A) reading from the deliveries chan returned from queue.Consume. It's meant to forward the delivery.Body along another chan, to another goroutine (B), which batch-processes (pipelines) a bunch of them at once. I was doing a bodiesChan <- d.Body within (A), but I was seeing a ton of corrupted bodies in (B) when I tried to json.Unmarshal them.

I thought it might be a memory model issue, that you might be re-using Delivery instances, or some other property which would render them somehow unsafe. So, as an experiment, I tried copying the Body to a fresh byte-slice before forwarding it on, which dramatically reduced but did not eliminate the errors. Also, the errors only occur reliably when the throughput is above some threshold. Which all leads me to believe there is something more subtle happening.

I feel like I'm using the API correctly. Am I missing something?

README

Now that things are settling down, extracting the motivation, general architecture/usage and license into the README will be needed.

Ignore Connection.Close if already closed

Hi,
can the library ignore calls to Connection.Close if the connection is already closed? Because currently it crashes in one of amqp's goroutines and that makes the system quite unstable. Because I have had the case that an error caused the connection to close and then my "defer conn.Close()" kicked in and crashed the whole thing, not allowing it to just recover the first error and try again to open a new connection.

Travis CI

Set up a build and test in Travis CI.

Question for you...

Question... I am looking over AMQP, is there an example or function that just gets a list of the queues from the server? Something equivalent to:

curl -i -u guest:guest http://localhost:55672/api/queues

I see functions on how to manipulate the queues, but you have to know what they are.

NewConnection: Unexpected protocol message

RabbitMQ 2.7.1 on Mac OS X 10.7 Lion. streadway/amqp "rpc" branch via remote-tracking-repo here. Calling amqp.NewConnection yields this output (after a couple seconds):

err in ReadFrame: <nil> EOF
connection shutdown 1
2012/05/24 13:05:08 Connect: Unexpected protocol message

Complete program illustrating the issue available here.

Implement Flow

Flow control and tests are missing.

Even though that RabbitMQ uses TCP pushback for connection wide flow control, at least implement handling of flow control messages for the sake of completeness.

Select on responses in low level API

There are 2 styles of calls to the server, message and RPC. Model the internal API such that multiple listeners can handle each kind of call allowing a select on a timeout and return value channels and eventually handle client ACKs.

Data race of me.Config.Heartbeat

Insert the following sleep commands to trigger the race detection every time:

diff --git a/connection.go b/connection.go
index 38f4421..1cfa550 100644
--- a/connection.go
+++ b/connection.go
@@ -355,6 +355,7 @@ func (me *Connection) reader() {
                // Reset the blocking read deadline on the underlying connection when it
                // implements SetReadDeadline to three times the requested heartbeat interval.
                // On error, resort to blocking reads.
+               time.Sleep(time.Second)
                if me.Config.Heartbeat > 0 {
                        if c, ok := me.conn.(readDeadliner); ok {
                                c.SetReadDeadline(time.Now().Add(3 * me.Config.Heartbeat))
@@ -522,6 +523,7 @@ func (me *Connection) openTune(config Config, auth Authentication) error {
        me.Config.Heartbeat = time.Second * time.Duration(pick(
                int(config.Heartbeat/time.Second),
                int(tune.Heartbeat)))
+       time.Sleep(2 * time.Second)

        // "The client should start sending heartbeats after receiving a
        // Connection.Tune method"

Then

go run -race examples/simple-producer/producer.go

will give:

WARNING: DATA RACE
Read by goroutine 6:
  github.com/streadway/amqp.(*Connection).reader()
      /.../src/github.com/streadway/amqp/connection.go:359 +0x256
  gosched0()
      /usr/local/Cellar/go/1.1/src/pkg/runtime/proc.c:1218 +0x9f

Previous write by goroutine 1:
  github.com/streadway/amqp.(*Connection).openTune()
      /.../src/github.com/streadway/amqp/connection.go:525 +0x4e4
  github.com/streadway/amqp.(*Connection).openStart()
      /.../src/github.com/streadway/amqp/connection.go:495 +0x4a1
  github.com/streadway/amqp.(*Connection).open()
      /.../src/github.com/streadway/amqp/connection.go:471 +0xc0
  github.com/streadway/amqp.Open()
      /.../src/github.com/streadway/amqp/connection.go:154 +0x4e8
  github.com/streadway/amqp.DialTLS()
      /.../src/github.com/streadway/amqp/connection.go:135 +0xe94
  github.com/streadway/amqp.Dial()
      /.../src/github.com/streadway/amqp/connection.go:77 +0x45
  main.publish()
      /.../src/github.com/streadway/amqp/examples/simple-producer/producer.go:40 +0x142
  main.main()
      /.../src/github.com/streadway/amqp/examples/simple-producer/producer.go:27 +0x174
  runtime.main()
      /usr/local/Cellar/go/1.1/src/pkg/runtime/proc.c:182 +0x91

Goroutine 6 (running) created at:
  github.com/streadway/amqp.Open()
      /.../src/github.com/streadway/amqp/connection.go:153 +0x4bf
  github.com/streadway/amqp.DialTLS()
      /.../src/github.com/streadway/amqp/connection.go:135 +0xe94
  github.com/streadway/amqp.Dial()
      /.../src/github.com/streadway/amqp/connection.go:77 +0x45
  main.publish()
      /.../src/github.com/streadway/amqp/examples/simple-producer/producer.go:40 +0x142
  main.main()
      /.../src/github.com/streadway/amqp/examples/simple-producer/producer.go:27 +0x174
  runtime.main()
      /usr/local/Cellar/go/1.1/src/pkg/runtime/proc.c:182 +0x91

Goroutine 1 (running) created at:
  _rt0_amd64()
      /usr/local/Cellar/go/1.1/src/pkg/runtime/asm_amd64.s:87 +0x106

Example: Confirm

Show how to have reliable publishing by handling acks/nacks in a Confirm listener.

TestIntegrationReturn failing with RabbitMQ 3.0.0

TestIntegrationReturn is failing with RabbitMQ 3.0.0 on my local machine.
However, it passes in isolation by running: AMQP_URL="amqp://guest:guest@localhost:5672/" go test -test.run TestIntegrationRun

I believe this is the indication of a bug I was tracking down which made my channel block on exchange declaration under certain conditions.

SetReadDeadline on net.Conn

When we control the requested heartbeat interval, we can reliably set a read deadline on the created connection inside of amqp.Dial.

Go idioms

The appropriate structs and fields are exported.

Documentation is built properly.

Appropriate interfaces are used like io.WriteTo... how to handle Read/ReadFrom?

Bitstream manipulation is intuitive.

Anything else that could make this code simpler.

Channel exception + close bug

The following code does a publish that throws a channel exception and immediately closes the channel, thus sending "Basic.Publish" and "Channel.Close". The server now answers with "Channel.Close" itself to notify the client of the channel exception and with "Channel.Close-Ok" to answer to "Channel.Close" (recorded with Wireshark). This case is not handled properly by the library, because it disposes the channel because of the server's "Channel.Close" and then can't find the channel any more when handling "Channel.Close-Ok". This in turn trashes the connection, thus the publish at the end fails, too. Even more, there is a delay until that happens (until heartbeat).

package main

import (
    "fmt"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost")
    if err != nil {
        panic(err)
    }
    go func() {
        for err := range conn.NotifyClose(make(chan *amqp.Error)) {
            fmt.Println("AMQP connection: " + err.Error())
        }
    }()

    channel1, err := conn.Channel()
    if err != nil {
        panic(err)
    }
    go func() {
        for err := range channel1.NotifyClose(make(chan *amqp.Error)) {
            fmt.Println("AMQP channel1: " + err.Error())
        }
    }()

    channel2, err := conn.Channel()
    if err != nil {
        panic(err)
    }
    go func() {
        for err := range channel2.NotifyClose(make(chan *amqp.Error)) {
            fmt.Println("AMQP channel2: " + err.Error())
        }
    }()

    err = channel1.Publish("nonexisting-exchange", "", false, false, amqp.Publishing{})
    if err != nil {
        panic(err)
    }

    channel1.Close()

    err = channel2.ExchangeDeclare("test", "topic", false, true, false, false, nil)
    if err != nil {
        panic(err)
    }
}

Integration tests are broken

ISTM that the integration tests are broken. I discovered this while I was composing test coverage for #67. While go test (and TravisCI) seem to PASS, upon closer inspection, for reasons unknown to me at the moment, the only tests being run are the ones in shared_test.go. I did not track down the breaking commit via git bisect, but maybe I will if I have a chance. I know that the tests were running as recently as a few weeks ago.

(BTW, I am still using go1.0.3 because we haven't upgraded yet)

Steps to reproduce:

  1. add a failing test to integration_test.go using a freshly-cloned copy of this repo.
  2. run go test and watch all the tests pass. If you run go test-v you'll only see the output from the shared_test.go.

Creative writing 101

The docs are wordy and often not clear. Reduce the noise and highlight the gotchas of AMQP mostly about order of operations.

Close events from a Connection and Channel

To support external reconnection handlers - expose a channel on the connection and channel types that will be readable when the connection or channel is closed.

A reconnect goroutine from your application would do something like:

go func() {
  for {
    <-conn.Closed:
    conn = reconnect()
  }
}()

I'd love opinions on how/what to send over the Closed channel.

Channel.Close can hang when there is an unconsumed message

This bug was really complicated to track down: The thing is, that the receiving goroutine of an amqp channel is blocked if the goroutine on the other end of the deliveries channel is not reading from the channel. In my case, this was because the session had ended and still messages arrived on the queue. When then calling Channel.Close, it sends a close message and waits for the confirmation, but this confirmation will never arrive because the receiving goroutine is still blocked.
I see two solutions:

  • Close all consuming go channels when Channel.Close is called before sending the Close message.
  • Allow the creation of buffered go channels.

Headers table is empty when the payload is empty

I can see the headers coming over the wire, but they get lost somewhere along the line inside the driver.

Steps to reproduce:

  1. modify the handler function in the simple consumer to also log the headers
  2. run the simple consumer with -lifetime=300s
  3. publish a message with headers but no payload to the test-queue queue from the management plugin.

Connection tuning encapsulated in amqp.Config

In order to negotiate heartbeat/auth/framing/channel sizes from the client while keeping the channel constructor simple, wrap up the state in a Config struct that will be copied into the resultant connection after tuning and used to discover body fragment size, heartbeat intervals and read/write deadlines.

gen.sh fails to create spec

While I run gen.sh, it fails to generate the spec go file;
$ ./gen.sh
2013/05/22 09:47:50 Could not parse XML: xml: response>name chain not valid with attr flag

Measure the impact of pass by value

Many of the structs and chans use pass by value instead of pass by reference/pointer. Would be great if someone better than me at the Go memory model can review the potential impact of pass by value and suggest where pass by reference would make more sense.

Adding declareQueue argument x-message-ttl gives network error

Experimenting with the amqp lib and it's worked fine until I added at x-message-ttl argument for one of my queues.

    args = make(amqp.Table)
    args["x-dead-letter-exchange"] = "someex"
    args["x-message-ttl"] = uint32(900000)
    deadQueue, err = channel.QueueDeclare("someq", true, false, false, false, args)
    if err != nil {
        return
    }

Gives me the error:

Exception (501) Reason: "WSARecv tcp 10.0.0.129:35740: The specified network name is no longer available."

Also tried passing the x-message-ttl as a string, but that gives a PRECONDITION_FAILED, ie:

Exception (406) Reason: "PRECONDITION_FAILED - inequivalent arg 'x-message-ttl'for queue 'soamon.requests-dead' in vhost 'soamon': received the value '900000' of type 'longstr' but current is none"

Is this a bug ... or a problem between the keyboard and the monitor? :)

Connection crashing because of multi native thread race condition

The following code is crashing the connection reproducibly. It only happens if GOMAXPROCS is set to greater than 1.

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "runtime"
)

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())

    conn, err := amqp.Dial("amqp://guest:guest@localhost")
    if err != nil {
        panic(err)
    }

    for {
        channel, err := conn.Channel()
        if err != nil {
            fmt.Println("Connection:", err)
            return
        }

        for i := 0; i < 100; i++ {
            err := channel.Publish("not-existing-exchange", "some-key", false, false, amqp.Publishing{Body: []byte("some-data")})
            if err != nil {
                fmt.Println("Channel:", err)
                break
            }
        }
    }
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.