Git Product home page Git Product logo

bunny's People

Contributors

ana06 avatar bartj3 avatar brerx avatar camelpunch avatar carlhoerberg avatar celldee avatar dasch avatar foeken avatar gdb avatar ionutzp avatar justindossey avatar madale avatar marianposaceanu avatar michaelklishin avatar mitio avatar nviennot avatar olleolleolle avatar priviterag avatar remisauvat avatar schmitze333 avatar sharshenov avatar soupmatt avatar stefansedich avatar tensho avatar tiredpixel avatar valo avatar vesln avatar womblep avatar xaviershay avatar yurusov avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

bunny's Issues

NoMethodError `handle_method` for nil, when acknowledging message

Hi there,

I'm using Bunny within Celluloid to build a small framework for handling messages between Rails apps. I'm still playing around at the moment, trying to wrap my head around both components.

I'm having trouble acknowledging a message inside a Celluloid Actor - the queue subscribes fine and I receive messages through it, but as soon as I attempt to acknowledge one of those messages, I get the following exception + stack trace: https://gist.github.com/76bbeb6688f27ef2e47e#file-gistfile1-txt

The full code is available in that gist too.

The @channel method is called from the same thread that created the channel, not from the thread in the subscribe block. I'd have thought this would be OK, but maybe I'm doing something stupid.

SSLSocket connect can hang due to missing timeout

Although Bunny imposes a timeout on the TCPSocket connect operation, there is no timeout on the SSLSocket connect. This is true for bunny 0.6.x - 0.8.x and also seems to be true on master.

From my reading of the C code, SSLSocket#connect calls through into native C code which performs an SSL connection handshake. In some cases -- e.g. quiescent server, malicious server, or RabbitMQ server that is out of file descriptors -- the handshake will never complete, which will cause Bunny to hang more-or-less indefinitely.

Relevant line of code:
https://github.com/ruby-amqp/bunny/blob/0.8.x-stable/lib/qrack/client.rb#L220

It seems to me that the timeout block should apply to basically the entire method. I have verified that this fixes the problem.

Repeated error messages being output when broker goes down

I've got a consumer that uses Queue#pop method. After broker is restarted or closed, insane amounts of these error messages are output:

NoMethodError
undefined method `read_fully' for nil:NilClass
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/bundler/gems/bunny-6fae2db09c0d/lib/bunny/transport.rb:109:in `read_next_frame'
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/bundler/gems/bunny-6fae2db09c0d/lib/bunny/main_loop.rb:26:in `block in run_loop'
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/bundler/gems/bunny-6fae2db09c0d/lib/bunny/main_loop.rb:22:in `loop'
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/bundler/gems/bunny-6fae2db09c0d/lib/bunny/main_loop.rb:22:in `run_loop'
AMQ::Protocol::EmptyResponseError
Empty response received from the server.
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/gems/amq-protocol-1.0.1/lib/amq/protocol/frame.rb:45:in `decode_header'
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/bundler/gems/bunny-6fae2db09c0d/lib/bunny/transport.rb:110:in `read_next_frame'
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/bundler/gems/bunny-6fae2db09c0d/lib/bunny/main_loop.rb:26:in `block in run_loop'
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/bundler/gems/bunny-6fae2db09c0d/lib/bunny/main_loop.rb:22:in `loop'
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/bundler/gems/bunny-6fae2db09c0d/lib/bunny/main_loop.rb:22:in `run_loop'

I assume it happens from here: https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/main_loop.rb#L54

On our servers it kept outputting until we were out of disk space :(
This problem is actually two-fold:

  1. There doesn't seem to be a way to detect broker going down
  2. Tight loop that catches all exceptions and outputs A LOT. I think it's much better to simply fail when unknown error happens.

Drop AMQP 0.8 support

Having all these bloody _08.rb files is a mess. Also we're about to port Bunny on AMQ protocol gem and it doesn't support AMQP 0.8 anyways.

Consumer does not consume messages if registered on a non-empty queue

If I start worker.rb and then publish messages from task.rb, I see that messages are consumed. If I reverse the order in which things are done - publish from task.rb and then start worker.rb - no messages are consumed. My expected result is that messages should be consumed regardless of the order in which publishing and consumption are done.

The problem seems to be that if a consumer is registered on a non-empty queue, Bunny cannot handle the message that it receives from the server. The message contains a consume-ok and delivery frameset for a published message, but it is treated just like a single consume-ok. Publishing more messages to the queue has no effect.

I'm using the latest code in ruby-amqp/bunny master.

The scripts are as follows -

worker.rb

#!/usr/bin/env ruby
require 'bunny'

connection = Bunny.new
connection.start

channel = connection.channel()

q = channel.queue('task_queue', :durable => true)
puts ' [*] Waiting for messages. To exit press CTRL+C'

channel.prefetch(prefetch_count=1)

q.subscribe(:block => true, :ack => true) do |delivery_info, properties, payload|
  puts " [x] Received #{payload}"
  puts " [x] Done"
  channel.ack(delivery_info.delivery_tag, false)
end

task.rb

#!/usr/bin/env ruby
require 'bunny'

connection = Bunny.new
connection.start

channel = connection.channel()

channel.queue('task_queue', :durable => true)
exchange = channel.default_exchange

message = ' ' + gets.chomp
exchange.publish(message, :routing_key => 'task_queue')

puts " [x] Sent #{message}"
connection.close()

Channel errors don't get cleared when a channel is re-opened

In 0.9.0.pre4, if a channel level error occurs and you re-open that channel, the previous channel error is re-raised even if the operation succeeds.

client = Bunny.new

channel = client.channel

# try to access a queue that doesn't exist
begin
  channel.queue("does.not.exist", :passive => true)
rescue Bunny::NotFound
  puts "got expected exception"
end

# reopen the channel
channel.open

channel.queue("my.queue") #raises Bunny::NotFound for queue "does.not.exist"

Lower [significant] performance overhead of Ruby's Timeout

Running the following code:

require 'bunny'
connection = Bunny.new({host: 'localhost', keepalive: true, threaded: false}})

connection.start
channel = connection.create_channel
exchange = channel.topic("general", durable: true)

while true
exchange.publish("hola", routing_key: 'resources', timestamp: Time.now.to_i, persistent: true)
end

I get a message throughput of about 1500 messages/sec in my local machine.

If I use these options:

Bunny.new({host: 'localhost', keepalive: true, threaded: false, socket_timeout: 0, connect_timeout: 0})

The message throughput goes up to 6000 messages/sec.

I believe this is mainly due to Timeout calls spawning new threads.

Implement network failure detection

If i let a bunny connection sitting idle in an irb session it will eventually start throwing this endlessly:

AMQ::Protocol::EmptyResponseError
Empty response received from the server.
/Users/carl/.rvm/gems/ruby-1.9.3-p194/gems/amq-protocol-1.0.1/lib/amq/protocol/frame.rb:45:in `decode_header'
/Users/carl/.rvm/gems/ruby-1.9.3-p194/gems/bunny-0.9.0.pre3/lib/bunny/transport.rb:110:in `read_next_frame'
/Users/carl/.rvm/gems/ruby-1.9.3-p194/gems/bunny-0.9.0.pre3/lib/bunny/main_loop.rb:26:in `block in run_loop'
/Users/carl/.rvm/gems/ruby-1.9.3-p194/gems/bunny-0.9.0.pre3/lib/bunny/main_loop.rb:22:in `loop'
/Users/carl/.rvm/gems/ruby-1.9.3-p194/gems/bunny-0.9.0.pre3/lib/bunny/main_loop.rb:22:in `run_loop'

Server is RabbitMQ 3.0.0, so maybe something with the heartbeat/connection timeout?

SSLSocket Method Missing

The following error occurs when I try to connect using :ssl => true -

NoMethodError: undefined method read_fully' for #<OpenSSL::SSL::SSLSocket:0x9f4a854> from /bunny-0.9.0.pre4/lib/bunny/transport.rb:116:inread_next_frame'

I know that some work is going on around the transport area, so this issue can wait until that work can be addressed.

Timeout after SSL handshake

The following hello-world style test cases were run in the same environment:

require 'rubygems'
require 'bunny'

b = Bunny.new("amqps://marysue")
b.start
q = b.queue("ssl_test")
e = b.exchange('')
e.publish("Bunny", :key => "ssl_test")

message = q.pop
puts "Received a message: #{message[:payload]}. Disconnecting..."

b.stop

and

require "rubygems"
require "amqp"

EventMachine.run do
  connection = AMQP.connect('amqps://marysue')
  channel  = AMQP::Channel.new(connection)
  queue    = channel.queue("ssl_test")
  exchange = channel.default_exchange
  exchange.publish "ruby-amqp", :routing_key => queue.name

  queue.subscribe do |payload|
    puts "Received a message: #{payload}. Disconnecting..."

    connection.close {
      EventMachine.stop { exit }
    }
  end
end

The bunny example fails with the following exception:

bunny-0.8.0.pre1/lib/bunny/client.rb:165:in open_connection': Connection failed - user: guest (Bunny::ProtocolError)`

Which I believe is actually a timeout error, since program freezes for several seconds before finally crashing. The program completes successfully if :ssl => true is removed from the connection line.

The following is the rabbitmq server log pertaining to the failing bunny connection:

=INFO REPORT==== 28-Feb-2012::11:49:57 ===
accepted TCP connection on [::]:5671 from 127.0.0.1:50850

=INFO REPORT==== 28-Feb-2012::11:49:57 ===
starting TCP connection <0.457.0> from 127.0.0.1:50850

=INFO REPORT==== 28-Feb-2012::11:49:57 ===
upgraded TCP connection <0.457.0> to SSL

=ERROR REPORT==== 28-Feb-2012::11:50:07 ===
exception on TCP connection <0.457.0> from 127.0.0.1:50850
{handshake_timeout,frame_header}

=INFO REPORT==== 28-Feb-2012::11:50:07 ===
closing TCP connection <0.457.0> from 127.0.0.1:50850

In comparison, here is the log entry from the successful amqp gem

=INFO REPORT==== 28-Feb-2012::11:51:31 ===
accepted TCP connection on [::]:5671 from 127.0.0.1:50891

=INFO REPORT==== 28-Feb-2012::11:51:31 ===
starting TCP connection <0.319.0> from 127.0.0.1:50891

=INFO REPORT==== 28-Feb-2012::11:51:31 ===
upgraded TCP connection <0.319.0> to SSL

=INFO REPORT==== 28-Feb-2012::11:51:31 ===
closing TCP connection <0.319.0> from 127.0.0.1:50891

For reference, marysue is simply the hostname of my local computer, for which I generated the certificate. The address resolves to 127.0.0.1

Documentation guides

Michael:

"Bunny does not have any documentation guides. The same approach with guides (store them under docs/*, use a DocumentationIndex page for TOC) worked well 3 times already (for amqp, eventmachine and travis-worker). This is also how rubydoc.info expects people to do things (in part because they only let projects use subset of YARD features).

I think amqp gem guides structure can be taken as is, content can be copied where necessary or written anew. I haven't see any attempts for projects to share documentation (code examples, obviously, cannot be shared at all), but having the same set of guides and TOC ordering is certainly possible."

0.7.9 - AMQP header :headers hash cannot have symbol values

I don't know if it has been fixed in master, but with bunny 0.7.9, nothing happens (using RabbitMQ as a broker, no message is received) when publishing a message that has a symbol value in the AMQP header :headers.

OK

exchange.publish('AA'), {
          :type => type,
          :key => key,
          :correlation_id => correlation_id,
          :reply_to => reply_to,
          :headers => { :xx => 'aa' }
          :content_type => 'application/json',
          :timestamp => Time.now.to_i
        })

Fails (no error triggered)

exchange.publish('AA'), {
          :type => type,
          :key => key,
          :correlation_id => correlation_id,
          :reply_to => reply_to,
          :headers => { :xx => 'aa'.to_sym }
          :content_type => 'application/json',
          :timestamp => Time.now.to_i
        })

Can't use Bunny with the RabbitMq Tracer tool

I use the Tracer tool ( http://www.rabbitmq.com/examples.html#tracer ) to debug my jruby and 1.9.2 applications. I recently added bunny to a rails app to (only) publish to a RabbitMq exchange. With the tracer in between RabbitMq server and the ruby app, I see a java exception in the Tracer on the Bunny connection. Could the protocol be the problem ( first line )? The Java line is ch#0 <- {#method<connection.start>(version-major=0, version-minor=9 ...

Env: Rails App - Rails 3.1.1, Bunny 0.7.8, ruby 1.9.3pre1, RabbitMq 2.6.1, Tracer 2.6.1, thin
Background App - jRuby 1.6.4 Java 7, jMongo 1.1.4 (-> RabbitMq Java Driver, 2.6.5).

Added below is the relevant part of the Tracer output.
is the jRuby side
is the MRI Bunny side
It looks like the connection opens OK but the channel is not opened.

1319551245765: ch#3 -> {#method<tx.commit>(),null,""}
1319551245799: ch#3 <- {#method<tx.commit-ok>(),null,""}
1319551245830: ch#0 <- {#method<connection.start>(version-major=8, version-minor=0, server-properties={product=RabbitMQ, information=Licensed under the MPL. See http://www.rabbitmq.com/, platform=Erlang/OTP, capabilities={}, copyright=Copyright (C) 2007-2011 VMware, Inc., version=2.6.1}, mechanisms=PLAIN AMQPLAIN, locales=en_US),null,""}
1319551245831: ch#0 -> {#method<connection.start-ok>(client-properties={product=Bunny, platform=Ruby, information=http://github.com/ruby-amqp/bunny, version=0.7.8}, mechanism=AMQPL****PASSWORD****, locale=en_US),null,""}
1319551245870: ch#0 <- {#method<connection.tune>(channel-max=0, frame-max=131072, heartbeat=0),null,""}
1319551245910: ch#0 -> {#method<connection.tune-ok>(channel-max=0, frame-max=131072, heartbeat=0),null,""}
1319551245910: ch#0 -> {#method<connection.open>(virtual-host=/user-test, capabilities=, insist=false),null,""}
1319551245920: ch#5 -> {#method<basic.get>(ticket=0, queue=message_failure, no-ack=false),null,""}
1319551245950: ch#0 <- {#method<connection.open-ok>(known-hosts=),null,""}
1319551245960: ch#5 <- {#method<basic.get-empty>(cluster-id=),null,""}
1319551245960: ch#5 -> {#method<basic.get>(ticket=0, queue=user, no-ack=false),null,""}
1319551245990: ch#1 -> {#method<channel.open>(out-of-band=),null,""}
1319551245999: ch#5 <- {#method<basic.get-empty>(cluster-id=),null,""}
1319551246033: uncaught java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at com.rabbitmq.client.impl.ValueReader.readBytes(ValueReader.java:86)
at com.rabbitmq.client.impl.ValueReader.readLongstr(ValueReader.java:112)
at com.rabbitmq.client.impl.ValueReader.readLongstr(ValueReader.java:120)
at com.rabbitmq.client.impl.MethodArgumentReader.readLongstr(MethodArgumentReader.java:73)
at com.rabbitmq.client.impl.AMQImpl$Channel$OpenOk.(AMQImpl.java:600)
at com.rabbitmq.client.impl.AMQImpl.readMethodFrom(AMQImpl.java:3242)
at com.rabbitmq.client.impl.AMQCommand$Assembler.handleFrame(AMQCommand.java:266)
at com.rabbitmq.tools.Tracer$DirectionHandler.doFrame(Tracer.java:331)
at com.rabbitmq.tools.Tracer$DirectionHandler.run(Tracer.java:345)
at java.lang.Thread.run(Thread.java:722)

queue.message_count under queue.subscribe bracket is unstable

queue.message_count under queue.subscribe bracket is unstable,

When queue.subscribe called, sometimes it will report error. and quit.

env = (rvm)ruby 1.9.3-p0 + bunny-0.7.9 + ubuntu 12.04 64bit.

error message is following:

someone@c2h2xu1204:/home/c2h2/Desktop# ruby bb.rb 
/usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/bunny/queue08.rb:303:in `status': undefined method `message_count' for <<sometag๏ฟฝ๏ฟฝasdf:Qrack::Protocol::Basic::Deliver (NoMethodError)
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/queue.rb:24:in `message_count'
    from bb.rb:17:in `block in <main>'
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/subscription.rb:81:in `call'
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/subscription.rb:81:in `block in start'
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/subscription.rb:56:in `loop'
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/subscription.rb:56:in `start'
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/bunny/queue08.rb:311:in `subscribe'
    from bb.rb:16:in `<main>'

sample code provided:
aa.rb as the emitter https://gist.github.com/2692967

bb.rb as the receiver https://gist.github.com/2692974

please run aa.rb several times and bb.rb for several times and error always happens on me for the second call on bb.rb.

Race condition with subscribe timeout

Hey everybody.

I've uncovered a rare issue caused when subscribing with a timeout. Every so often, Bunny will just freeze and never exit. The theory is that Bunny is in the processing of receiving a message when the timeout is hit (very small window), causing the follow-up unsubscribe to fail.

Here's the stack-trace when interrupting a frozen process:

/Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/client.rb:194:in `read': Interrupt
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/client.rb:194:in `send_command'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/client.rb:124:in `read'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:272:in `_read'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:103:in `block in read'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:89:in `map'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:89:in `read'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/transport/frame08.rb:61:in `block in parse'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:263:in `extract'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/transport/frame08.rb:60:in `parse'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/bunny/client08.rb:158:in `next_frame'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/client.rb:117:in `next_payload'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/bunny/queue08.rb:343:in `unsubscribe'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/subscription.rb:61:in `rescue in block in start'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/subscription.rb:58:in `block in start'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/subscription.rb:56:in `loop'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/subscription.rb:56:in `start'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/bunny/queue08.rb:311:in `subscribe'
from bunny.rb:39:in `block in <main>'
from bunny.rb:34:in `times'
from bunny.rb:34:in `<main>'

I was able to (randomly) reproduce the issue with this simple script:

require 'bunny'

bunny_write = Bunny.new#(:logging => true)
bunny_read = Bunny.new(:logging => true)

bunny_write.start
bunny_read.start

bunny_read.qos

direct_exchange = bunny_write.exchange('')

queue_read = bunny_read.queue('test1')
queue_write = bunny_write.queue('test1')

queue_read.purge # clear messages from previous run

NUM_MESSAGES = 1000

Thread.new do
  NUM_MESSAGES.times do |i|
    # publish a message to the queue
    direct_exchange.publish("#{ i }", key: queue_write.name)
    puts "sent #{ i }"

    sleep rand
  end

  bunny_write.stop
end

NUM_MESSAGES.times do |i|
  timeout = rand
  got_message = false

  # get message from the queue
  queue_read.subscribe(:timeout => timeout, :message_max => 1) do |message|
    got_message = true
  end

  puts "#{ got_message ? "recieved" : "missed" } #{ i }"
end

bunny_read.stop
bunny_write.stop

Here's the last few lines of output from this script. Keep in mind that "sent" and "received" lines aren't meant to match up.

 recieved 644
 I, [2012-02-18 17:35:28#94063]  INFO -- send: ^A^@^A^@^@^@E^@<^@^T^@^A^Etest1729271
 I, [2012-02-18 17:35:28#94063]  INFO -- received: ^A^@^A^@^@^@<^@<^@^U7292712987322
 sent 355
 I, [2012-02-18 17:35:29#94063]  INFO -- received: ^A^@^A^@^@^@L^@<^@<72927129873223
 I, [2012-02-18 17:35:29#94063]  INFO -- received: ^B^@^A^@^@^@)^@<^@^@^@^@^@^@^@^@^
 I, [2012-02-18 17:35:29#94063]  INFO -- received: ^C^@^A^@^@^@^C355รŽ
 I, [2012-02-18 17:35:29#94063]  INFO -- send: ^A^@^A^@^@^@=^@<^@^^72927129873223131
 I, [2012-02-18 17:35:29#94063]  INFO -- received: ^A^@^A^@^@^@<^@<^@^_7292712987322
 recieved 645
 I, [2012-02-18 17:35:29#94063]  INFO -- send: ^A^@^A^@^@^@E^@<^@^T^@^A^Etest1797302
 I, [2012-02-18 17:35:29#94063]  INFO -- received: ^A^@^A^@^@^@<^@<^@^U7973022136281
 sent 356
 I, [2012-02-18 17:35:29#94063]  INFO -- send: ^A^@^A^@^@^@=^@<^@^^79730221362814317
 sent 357
 sent 358
 sent 359
 ...

The obvious solution is to set a socket timeout when creating the client, but it has to be greater than maximum timeout amount. Maybe there is a better fix avaliable.

Thanks!

Eliminate "one consumer per queue" limitation

amqp 0.8.0.RC14 no longer has "one consumer per queue on a channel" limitation. Because load-balancing happens between consumers, it is annoying, even though easy to work around. It potentially may even be misleading.

So while I kept AMQP::Queue#subscribe and it still only can be used once per queue, it internally sets up an AMQP::Consumer instance and those can be instantiated, activated, cancelled or passed around like any other objects. Bunny should do the same. It is great to have Queue#subscribe, but there has to be a way to add more consumers when necessary, even if with a few more lines.

Get rid of copied AMQ::Client code

We need to get rid of copied AMQ::Client code (renaming it would be sufficient), ideally it should be extracted. Most of the bits are already in amq-protocol, we need to make sure AMQP URI parsing is moved there, too.

Unsubscribing from a queue that has a message on it raises an exception.

This is the same code I used:

require 'rubygems'
require 'bunny'

b = Bunny.new(:host => '127.0.0.1', :spec => '09', :logging => true)
b.start

exchange = b.exchange('')

q = b.queue(nil, :exclusive => false)

exchange.publish('abc', :key => q.name)
exchange.publish('abc', :key => q.name)


puts '.'

q.subscribe( :timeout => 5, :message_max => 1 ) do |msg|
    puts msg[:payload]
end


puts '.'

The exception I receive:

/Users/mike/.rvm/gems/ruby-1.8.7-p334/gems/bunny-0.7.6/lib/bunny/client09.rb:55:in `check_response': Error unsubscribing from queue amq.gen-5883aQtSztoCgUHv1guX9g== (Bunny::ProtocolError)
    from /Users/mike/.rvm/gems/ruby-1.8.7-p334/gems/bunny-0.7.6/lib/bunny/queue09.rb:308:in `unsubscribe'
    from /Users/mike/.rvm/gems/ruby-1.8.7-p334/gems/bunny-0.7.6/lib/qrack/subscription.rb:86:in `consume'
    from /Users/mike/.rvm/gems/ruby-1.8.7-p334/gems/bunny-0.7.6/lib/qrack/subscription.rb:56:in `loop'
    from /Users/mike/.rvm/gems/ruby-1.8.7-p334/gems/bunny-0.7.6/lib/qrack/subscription.rb:56:in `consume'
    from /Users/mike/.rvm/gems/ruby-1.8.7-p334/gems/bunny-0.7.6/lib/bunny/queue09.rb:248:in `subscribe'
    from test.rb:17

And here's the raw log:

I, [2011-10-09 01:12:14#15961]  INFO -- received: #<Qrack::Transport09::Method:0x100625d58 @payload=#<Qrack::Protocol09::Connection::Start:0x1006257b8 @server_properties={:version=>"2.6.1", :information=>"Licensed under the MPL.  See http://www.rabbitmq.com/", :copyright=>"Copyright (C) 2007-2011 VMware, Inc.", :capabilities=>{:consumer_cancel_notify=>1, :"basic.nack"=>1, :publisher_confirms=>1, :exchange_exchange_bindings=>1}, :platform=>"Erlang/OTP", :product=>"RabbitMQ"}, @version_minor=9, @locales="en_US", @version_major=0, @mechanisms="PLAIN AMQPLAIN">, @channel=0>
I, [2011-10-09 01:12:14#15961]  INFO -- send: #<Qrack::Transport09::Method:0x100611b00 @payload=#<Qrack::Protocol09::Connection::StartOk:0x100612208 @mechanism="PLAIN", @client_properties={:version=>"0.7.6", :information=>"http://github.com/ruby-amqp/bunny", :platform=>"Ruby", :product=>"Bunny"}, @locale="en_US", @response="\000guest\000guest">, @channel=0>
I, [2011-10-09 01:12:14#15961]  INFO -- received: #<Qrack::Transport09::Method:0x10060bf48 @payload=#<Qrack::Protocol09::Connection::Tune:0x10060bc50 @channel_max=0, @heartbeat=0, @frame_max=131072>, @channel=0>
I, [2011-10-09 01:12:14#15961]  INFO -- send: #<Qrack::Transport09::Method:0x1006094f0 @payload=#<Qrack::Protocol09::Connection::TuneOk:0x10060d6b8 @channel_max=0, @heartbeat=0, @frame_max=131072>, @channel=0>
I, [2011-10-09 01:12:14#15961]  INFO -- send: #<Qrack::Transport09::Method:0x100606d18 @payload=#<Qrack::Protocol09::Connection::Open:0x1006074e8 @virtual_host="/", @deprecated_insist=nil, @deprecated_capabilities=nil>, @channel=0>
I, [2011-10-09 01:12:14#15961]  INFO -- received: #<Qrack::Transport09::Method:0x1005fdc68 @payload=#<Qrack::Protocol09::Connection::OpenOk:0x1005fd768 @deprecated_known_hosts="">, @channel=0>
I, [2011-10-09 01:12:14#15961]  INFO -- send: #<Qrack::Transport09::Method:0x1005f9258 @payload=#<Qrack::Protocol09::Channel::Open:0x1005ffe28 @deprecated_out_of_band=nil>, @channel=1>
I, [2011-10-09 01:12:14#15961]  INFO -- received: #<Qrack::Transport09::Method:0x1005f3e70 @payload=#<Qrack::Protocol09::Channel::OpenOk:0x1005f3038 @deprecated_channel_id="">, @channel=1>
I, [2011-10-09 01:12:14#15961]  INFO -- send: #<Qrack::Transport09::Method:0x1005f0f18 @payload=#<Qrack::Protocol09::Queue::Declare:0x1005f1e40 @queue="", @deprecated_ticket=0, @durable=nil, @auto_delete=true, @nowait=nil, @passive=nil, @exclusive=nil, @arguments=nil>, @channel=1>
I, [2011-10-09 01:12:14#15961]  INFO -- received: #<Qrack::Transport09::Method:0x1005eb680 @payload=#<Qrack::Protocol09::Queue::DeclareOk:0x1005eb450 @queue="amq.gen-5883aQtSztoCgUHv1guX9g==", @message_count=0, @consumer_count=0>, @channel=1>
I, [2011-10-09 01:12:14#15961]  INFO -- send: #<Qrack::Transport09::Method:0x1005e9308 @payload=#<Qrack::Protocol09::Basic::Publish:0x1005e9ad8 @deprecated_ticket=0, @immediate=nil, @mandatory=nil, @routing_key="amq.gen-5883aQtSztoCgUHv1guX9g==", @exchange="">, @channel=1>
I, [2011-10-09 01:12:14#15961]  INFO -- send: #<Qrack::Transport09::Header:0x1005e7710 @payload=#<Qrack::Protocol09::Header:0x1005e9740 @size=3, @klass=Qrack::Protocol09::Basic, @weight=0, @properties={:content_type=>"application/octet-stream", :delivery_mode=>1, :priority=>0}>, @channel=1>
I, [2011-10-09 01:12:14#15961]  INFO -- send: #<Qrack::Transport09::Body:0x1005e9678 @payload="abc", @channel=1>
I, [2011-10-09 01:12:14#15961]  INFO -- send: #<Qrack::Transport09::Method:0x1005d97a0 @payload=#<Qrack::Protocol09::Basic::Publish:0x1005de048 @deprecated_ticket=0, @immediate=nil, @mandatory=nil, @routing_key="amq.gen-5883aQtSztoCgUHv1guX9g==", @exchange="">, @channel=1>
I, [2011-10-09 01:12:14#15961]  INFO -- send: #<Qrack::Transport09::Header:0x1005d58d0 @payload=#<Qrack::Protocol09::Header:0x1005dbb40 @size=3, @klass=Qrack::Protocol09::Basic, @weight=0, @properties={:content_type=>"application/octet-stream", :delivery_mode=>1, :priority=>0}>, @channel=1>
I, [2011-10-09 01:12:14#15961]  INFO -- send: #<Qrack::Transport09::Body:0x1005db898 @payload="abc", @channel=1>
.
I, [2011-10-09 01:12:14#15961]  INFO -- send: #<Qrack::Transport09::Method:0x1005d0c90 @payload=#<Qrack::Protocol09::Basic::Consume:0x1005d13c0 @queue="amq.gen-5883aQtSztoCgUHv1guX9g==", @no_ack=true, @deprecated_ticket=0, @no_local=nil, @nowait=nil, @exclusive=nil, @consumer_tag="2722213142623242820131851025322121916153119294171130786", @filter=nil>, @channel=1>
I, [2011-10-09 01:12:14#15961]  INFO -- received: #<Qrack::Transport09::Method:0x1005cbbc8 @payload=#<Qrack::Protocol09::Basic::ConsumeOk:0x1005cb718 @consumer_tag="2722213142623242820131851025322121916153119294171130786">, @channel=1>
I, [2011-10-09 01:12:14#15961]  INFO -- received: #<Qrack::Transport09::Method:0x1005c6600 @payload=#<Qrack::Protocol09::Basic::Deliver:0x1005c5d40 @delivery_tag=1, @routing_key="amq.gen-5883aQtSztoCgUHv1guX9g==", @consumer_tag="2722213142623242820131851025322121916153119294171130786", @redelivered=false, @exchange="">, @channel=1>
I, [2011-10-09 01:12:14#15961]  INFO -- received: #<Qrack::Transport09::Header:0x1005c0638 @payload=#<Qrack::Protocol09::Header:0x1005c0368 @size=3, @klass=Qrack::Protocol09::Basic, @weight=0, @properties={:content_type=>"application/octet-stream", :delivery_mode=>1, :priority=>0}>, @channel=1>
I, [2011-10-09 01:12:14#15961]  INFO -- received: #<Qrack::Transport09::Body:0x1005b94f0 @payload="abc", @channel=1>
abc
I, [2011-10-09 01:12:14#15961]  INFO -- send: #<Qrack::Transport09::Method:0x1005b8438 @payload=#<Qrack::Protocol09::Basic::Cancel:0x1005b8c58 @nowait=nil, @consumer_tag="2722213142623242820131851025322121916153119294171130786">, @channel=1>
I, [2011-10-09 01:12:14#15961]  INFO -- received: #<Qrack::Transport09::Method:0x1005b2df8 @payload=#<Qrack::Protocol09::Basic::Deliver:0x1005b2808 @delivery_tag=2, @routing_key="amq.gen-5883aQtSztoCgUHv1guX9g==", @consumer_tag="2722213142623242820131851025322121916153119294171130786", @redelivered=false, @exchange="">, @channel=1>

Declaring an exchange with no permission should be handled better

NoMethodError: undefined method `payload' for nil:NilClass
next_payload at /home/petef/.rvm/gems/jruby-1.6.0/gems/bunny-0.6.0/lib/qrack/client.rb:84
open_connection at /home/petef/.rvm/gems/jruby-1.6.0/gems/bunny-0.6.0/lib/bunny/client08.rb:210
start_session at /home/petef/.rvm/gems/jruby-1.6.0/gems/bunny-0.6.0/lib/bunny/client08.rb:397
loop at org/jruby/RubyKernel.java:
start_session at /home/petef/.rvm/gems/jruby-1.6.0/gems/bunny-0.6.0/lib/bunny/client08.rb:389
(root) at bunny-bug.rb:7

This happens when you try connecting with a valid username/password, but one that has no permissions on the vhost in question. To reproduce, here's the code:

b = Bunny.new({:vhost => "/test", :user => "guest", :pass => "guest"})
b.start
e = b.exchange("rawlogs", :type => "direct")

and assuming rabbitmq, you'll need to "rabbitmqctl add_vhost /test" to create the vhost. By default, the "guest" user will not have access to create exchanges under /test.

Expected behavior: a Bunny:: exception with a better error message

EventMachine/Fibers support

Hello,

I added EventMachine+Fibers support: cjbottaro@68824d7

A part of that process was factoring out the idea of a "connection". You can see from the commit that @socket is replaced with a "connection". As far as I can tell, very little of the socket interface is used and I formalized what is used in Qrack::Connection::AbstractBase. Then I created two connections types that inherit from AbstractBase: Socket (the default) and FiberedEm.

Which connection type to use can be specified in Bunny#new, but if none is specified, it tries to dynamically figure out which to use (See Qrack::Client#default_connection_type).

All of Bunny's tests still pass using the default socket connection type. I've tested the :fibered_em connection type against a live RabbitMQ server (a basic subset of operations) and it seems to work ok. I have not written any specs / unit tests though.

I want to send a pull request, but would like yall to review the changes first and am curious for any ideas about how to write specs / unit tests.

Usage of Bunny + EM/Fibers is as follows:

require "bunny"
require "eventmachine"
require "fiber"

EM.run do
  Fiber.new do

    fm = Fiber.current
    done_count = 0

    f1 = Fiber.new do
      b = Bunny.new :connection_type => :fibered_em
      puts "start1"
      b.start
      puts "stop1"
      b.stop
      done_count += 1
      fm.resume
    end

    f2 = Fiber.new do
      b = Bunny.new :connection_type => :fibered_em
      puts "start2"
      b.start
      puts "stop2"
      b.stop
      done_count += 1
      fm.resume
    end

    f1.resume
    f2.resume

    # Kinda like joining threads.
    Fiber.yield while done_count < 2

    EM.stop

  end.resume
end

Note that specifying the :connection_type is unnecessary in that it should figure out that it's running in an EM+Fibers environment.

Thanks,
-- C

Cannot call Queue#status while inside subscribe block

I'm using ruby 1.9.2p290 with Bunny 0.7.9.

This is my sample code:

require 'bunny'

STDOUT.sync = true

client = Bunny.new(ENV["RABBITMQ_URL"])
client.start

e = client.exchange('data', type: :topic, durable: true)
q = Bunny::Queue.new(client, 'simple_persister', durable: true)
q.bind(e, key: 'hello.*')

2.times do 
  e.publish("hello", key: "hello.world")
end

q.subscribe(ack: true) do |msg|
  p [:debug, q.status]
end

Upon execution, I see:

/Users/mgorsuch/.rbenv/versions/1.9.2-p290/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/bunny/queue08.rb:303:in `status': undefined method `message_count' for #<Qrack::Protocol::Basic::Deliver:0x007fc5db93a848> (NoMethodError)
    from pub.rb:17:in `block in <main>'
    from /Users/mgorsuch/.rbenv/versions/1.9.2-p290/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/subscription.rb:81:in `call'
    from /Users/mgorsuch/.rbenv/versions/1.9.2-p290/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/subscription.rb:81:in `block in start'
    from /Users/mgorsuch/.rbenv/versions/1.9.2-p290/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/subscription.rb:56:in `loop'
    from /Users/mgorsuch/.rbenv/versions/1.9.2-p290/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/subscription.rb:56:in `start'
    from /Users/mgorsuch/.rbenv/versions/1.9.2-p290/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/bunny/queue08.rb:311:in `subscribe'
    from pub.rb:16:in `<main>'

If I change 2.times to 1.times, it works as expected.

Am I doing it wrong?

Too few options exposed

bunny seems to hide many options and decide on default options that you can't override. For example, publishing a message to an exchange.

Here are all the underlying options as defined in lib/qrack/protocol/spec.rb:

shortstr   :content_type
      shortstr   :content_encoding
      table      :headers
      octet      :delivery_mode
      octet      :priority
      shortstr   :correlation_id
      shortstr   :reply_to
      shortstr   :expiration
      shortstr   :message_id
      timestamp  :timestamp
      shortstr   :type
      shortstr   :user_id
      shortstr   :app_id
      shortstr   :deprecated_cluster_id

But in lib/bunny/exchange.rb publish only has a limited subset of these options (content_type, and delivery_mode) and just decides with no possibility of overriding that you want the priority to be 0. This is making it impossible (without modification) for use in my project as I need access to some of these properties that are just not accessible.

Can we please get more options exposed?

Thanks :)

Can't access exchange

I am trying to write to exchange in passive mode with user who have only write permission to rabbitmq-server.

As far as I understood from documentation, it should be possible, using passive exchange declaration:

sudo rabbitmqctl set_permissions -p /x user '' 'my_exchange' ''

then test.rb:

b  Bunny.new(:host => 'localhost',:vhost=> "/x", :port => 5672,:user=>"user",:pass=> "123",:logging=>true )
b.start
e = b.exchange("my_exchange", :passive => true)

I checked my_exchange already exists, log is following:

I, [2012-02-13 12:28:11#11644]  INFO -- send: #<Qrack::Transport::Method:0x7f1804d7f348 @channel=1, @payload=#  <Qrack::Protocol::Exchange::Declare:0x7f1804d7f848 @durable=nil, @passive=true, @nowait=nil, @exchange="my_exchange",   @internal=nil, @type=:topic, @ticket=1, @arguments=nil, @auto_delete=nil>>
I, [2012-02-13 12:28:11#11644]  INFO -- received: #<Qrack::Transport::Method:0x7f1804d7d980 @channel=1, @payload=#<Qrack::Protocol::Channel::Close:0x7f1804d7d818 @class_id=40, @reply_text="ACCESS_REFUSED - access to exchange 'my_exchange' in vhost '/x' refused for user 'user'", @reply_code=403, @method_id=10>>

Why would that be? plus, it seems bunny is sending correct info. So I am not sure if this is bunny bug, it seems to me that it's not likely that rabbitmq-server would have such bug, I am using rabbitmq-server 2.5.1 and newest bunny 0.7.9.

If I give config access, then it works, but then what was the point of passive declaration.

Cannot bind/unbind during subscripton

I'm using ruby 1.9.3p0 with bunny 0.79, RabbitMQ

From what I understand, this may be a problem due to bunny's synchronous nature.

Anyways: While subscribing to a queue, I cannot change the bindings keys on that queue.

Is there anyway to solve this? Or is a fix planned in the future?

Setup: bind_test_publish.rb sends 2 messages each second, to topic exchange with keys "test.1" and "test.2"

Tried with and without qos/ack.

bind_test_subscribe.rb

require 'bunny'

# start a communication session with the amqp server

server=Bunny.new()
server.start
#server.qos(:prefetch_count => 1)

exchange=server.exchange("test_exchange", :type=>:topic)

@queue = server.queue("test_exchange_queue", :auto_delete=>true)

@queue.bind(exchange,:key=>"test.1")

Thread.new do
  @queue.subscribe() do |msg|
    puts msg[:payload]
  end
end

sleep(10)

#@queue.unbind(exchange,:key=>"test.1")
@queue.bind(exchange,:key=>"test.2")

sleep(10)
# close the client connection
server.stop

When I try to bind, I get the following error:

/usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/bunny/client08.rb:81:in `check_response': Error binding queue: test_exchange_queue to exchange: test_exchange (Bunny::ProtocolError)
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/bunny/queue08.rb:137:in `bind'
    from bind_test_subscribe.rb:25:in `<main>

When I try to unbind, the program freezes. Output from ctrl+c:

^C/usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/client.rb:194:in `read': Interrupt
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/client.rb:194:in `send_command'
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/client.rb:124:in `read'
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:272:in `_read'
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:94:in `block in read'
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:89:in `map'
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:89:in `read'
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/transport/frame08.rb:61:in `block in parse'
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:263:in `extract'
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/transport/frame08.rb:60:in `parse'
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/bunny/client08.rb:158:in `next_frame'
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/client.rb:117:in `next_payload'
    from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/bunny/queue08.rb:387:in `unbind'
    from bind_test_subscribe.rb:24:in `<main>'

frame_too_large

I have a large array consisting of 100 Hash objects, and converted into a json string.

when I tried to publish the json string to rabbitmq with bunny, I got an error msg of
Unknown reply code: 501, text: FRAME_ERROR - type 3, all octets = <<>>: {frame_too_large,130659,130164}

Increase the size of frame_max in both rabbitmq.config and session.rb could solve the problem. But as I increase the size of the array, the error msg could appear again.

Looks like when bunny or rabbitmq splits the msg into frames, it doesn't split it into right sizes.

Any idea?

Unsubscribe from queue

I am looking to subscribe to receive a single message in a queue (a temporary queue that has an rpc response). Once I receive this message, I'd like to cancel the subscription. Am I right there is no currently supported way to cancel a subscription? e.g.

  Timeout.timeout(15) do
      callback_queue.subscribe(:block=>true) do |delivery_info, properties, payload|
                   # Do work
                   # End blocking wait with unsubscribe
                   callback_queue.unsubscribe
      end
  end

Set up continuous integration (with Travis)

I have been observing some failures in the past, but today I see as many as 52 failures:

rspec ./spec/spec_09/bunny_spec.rb:18 # Bunny should connect to an AMQP server
rspec ./spec/spec_09/bunny_spec.rb:22 # Bunny should be able to create and open a new channel
rspec ./spec/spec_09/bunny_spec.rb:31 # Bunny should be able to switch between channels
rspec ./spec/spec_09/bunny_spec.rb:37 # Bunny should raise an error if trying to switch to a non-existent channel
rspec ./spec/spec_09/bunny_spec.rb:41 # Bunny should be able to create an exchange
rspec ./spec/spec_09/bunny_spec.rb:48 # Bunny should be able to create a queue
rspec ./spec/spec_09/bunny_spec.rb:56 # Bunny should raise an error if setting of QoS fails
rspec ./spec/spec_09/bunny_spec.rb:61 # Bunny should be able to set QoS
rspec ./spec/spec_09/exchange_spec.rb:18 # Exchange should raise an error if instantiated as non-existent type
rspec ./spec/spec_09/exchange_spec.rb:23 # Exchange should allow a default direct exchange to be instantiated by specifying :type
rspec ./spec/spec_09/exchange_spec.rb:31 # Exchange should allow a default direct exchange to be instantiated without specifying :type
rspec ./spec/spec_09/exchange_spec.rb:39 # Exchange should allow a default fanout exchange to be instantiated without specifying :type
rspec ./spec/spec_09/exchange_spec.rb:47 # Exchange should allow a default topic exchange to be instantiated without specifying :type
rspec ./spec/spec_09/exchange_spec.rb:55 # Exchange should allow a default headers (amq.match) exchange to be instantiated without specifying :type
rspec ./spec/spec_09/exchange_spec.rb:63 # Exchange should allow a default headers (amq.headers) exchange to be instantiated without specifying :type
rspec ./spec/spec_09/exchange_spec.rb:71 # Exchange should create an exchange as direct by default
rspec ./spec/spec_09/exchange_spec.rb:79 # Exchange should be able to be instantiated as a direct exchange
rspec ./spec/spec_09/exchange_spec.rb:87 # Exchange should be able to be instantiated as a topic exchange
rspec ./spec/spec_09/exchange_spec.rb:95 # Exchange should be able to be instantiated as a fanout exchange
rspec ./spec/spec_09/exchange_spec.rb:103 # Exchange should be able to be instantiated as a headers exchange
rspec ./spec/spec_09/exchange_spec.rb:111 # Exchange should ignore the :nowait option when instantiated
rspec ./spec/spec_09/exchange_spec.rb:115 # Exchange should be able to publish a message
rspec ./spec/spec_09/exchange_spec.rb:120 # Exchange should not modify the passed options hash when publishing a message
rspec ./spec/spec_09/exchange_spec.rb:127 # Exchange should be able to return an undeliverable message
rspec ./spec/spec_09/exchange_spec.rb:135 # Exchange should be able to return a message that exceeds maximum frame size
rspec ./spec/spec_09/exchange_spec.rb:144 # Exchange should report an error if delete fails
rspec ./spec/spec_09/exchange_spec.rb:150 # Exchange should be able to be deleted
rspec ./spec/spec_09/exchange_spec.rb:157 # Exchange should ignore the :nowait option when deleted
rspec ./spec/spec_09/queue_spec.rb:18 # Queue should ignore the :nowait option when instantiated
rspec ./spec/spec_09/queue_spec.rb:22 # Queue should ignore the :nowait option when binding to an exchange
rspec ./spec/spec_09/queue_spec.rb:28 # Queue should raise an error when trying to bind to a non-existent exchange
rspec ./spec/spec_09/queue_spec.rb:34 # Queue should be able to bind to an existing exchange
rspec ./spec/spec_09/queue_spec.rb:40 # Queue should ignore the :nowait option when unbinding from an existing exchange
rspec ./spec/spec_09/queue_spec.rb:46 # Queue should raise an error if unbinding from a non-existent exchange
rspec ./spec/spec_09/queue_spec.rb:52 # Queue should be able to unbind from an exchange
rspec ./spec/spec_09/queue_spec.rb:58 # Queue should be able to publish a message
rspec ./spec/spec_09/queue_spec.rb:64 # Queue should be able to pop a message complete with header and delivery details
rspec ./spec/spec_09/queue_spec.rb:74 # Queue should be able to pop a message and just get the payload
rspec ./spec/spec_09/queue_spec.rb:82 # Queue should be able to pop a message where body length exceeds max frame size
rspec ./spec/spec_09/queue_spec.rb:90 # Queue should be able call a block when popping a message
rspec ./spec/spec_09/queue_spec.rb:97 # Queue should raise an error if purge fails
rspec ./spec/spec_09/queue_spec.rb:104 # Queue should be able to be purged to remove all of its messages
rspec ./spec/spec_09/queue_spec.rb:111 # Queue should return an empty message when popping an empty queue
rspec ./spec/spec_09/queue_spec.rb:119 # Queue should stop subscription without processing messages if max specified is 0
rspec ./spec/spec_09/queue_spec.rb:128 # Queue should stop subscription after processing number of messages specified > 0
rspec ./spec/spec_09/queue_spec.rb:135 # Queue should stop subscription after processing message_max messages < total in queue
rspec ./spec/spec_09/queue_spec.rb:145 # Queue should raise an error when delete fails
rspec ./spec/spec_09/queue_spec.rb:151 # Queue should pass correct block parameters through on subscribe
rspec ./spec/spec_09/queue_spec.rb:165 # Queue should finish processing subscription messages if break is called in block
rspec ./spec/spec_09/queue_spec.rb:184 # Queue should be able to be deleted
rspec ./spec/spec_09/queue_spec.rb:191 # Queue should ignore the :nowait option when deleted
rspec ./spec/spec_09/queue_spec.rb:196 # Queue should support server named queues

after these are resolved, Bunny should use Travis for CI.

Exchange publish mandatory return being set 1 if explicitly turned off.

I ran into an issue today using bunny via logstash. It appears that if you explicitly set the :mandatory => 0, it actually enables the mandatory flag. I'm was also experimenting with immediate at the time so it could also be a combination of that. I'm going to do some tests to validate.

I'm still combing through the code base to make a pull request.

/cc @michaelklishin

100% CPU usage from network activity loop

Hi! I love bunny! Thanx for this amazing software!

My problem ist that the workers I wrote have a very high CPU-impact (they "eat" one core).
I have no problem with this on OSX (10.8) but only on Linux x86_64 (Ubuntu 12.04).

Any Suggestions? What is the best way to have a forever running process with bunny-listeners?

Below is a snippet that shows the behaviour.

require "rubygems"require "bunny"
conn = Bunny.new
conn.start
ch = conn.create_channel
q  = ch.queue("bunny.examples.hello_world", :auto_delete => true)
x  = ch.default_exchange

q.subscribe do |delivery_info, metadata, payload|
  puts "Received #{payload}"
end

while true do
#  x.publish("Hello!", :routing_key => q.name)
  sleep 10
end

Connection reset by peer in tests with RabbitMQ 2.8.1

  1) Bunny should raise an error if the wrong user name or password is used
     Failure/Error: lambda { b.start}.should raise_error(Bunny::ProtocolError)
       expected Bunny::ProtocolError, got #<Errno::ECONNRESET: Connection reset by peer>
     # ./spec/spec_09/connection_spec.rb:11:in `block (2 levels) in <top (required)>'

And the stacktrace, when you do it manually:


Errno::ECONNRESET: Connection reset by peer
    from /home/justinf/src/bunny/lib/qrack/client.rb:189:in `read'
    from /home/justinf/src/bunny/lib/qrack/client.rb:189:in `send_command'
    from /home/justinf/src/bunny/lib/qrack/client.rb:114:in `read'
    from /home/justinf/src/bunny/lib/qrack/transport/buffer.rb:280:in `_read'
    from /home/justinf/src/bunny/lib/qrack/transport/buffer.rb:92:in `block in read'
    from /home/justinf/src/bunny/lib/qrack/transport/buffer.rb:89:in `map'
    from /home/justinf/src/bunny/lib/qrack/transport/buffer.rb:89:in `read'
    from /home/justinf/src/bunny/lib/qrack/transport/frame.rb:62:in `block in parse'
    from /home/justinf/src/bunny/lib/qrack/transport/buffer.rb:263:in `extract'
    from /home/justinf/src/bunny/lib/qrack/transport/frame.rb:61:in `parse'
    from /home/justinf/src/bunny/lib/bunny/client.rb:128:in `next_frame'
    from /home/justinf/src/bunny/lib/bunny/client.rb:162:in `open_connection'
    from /home/justinf/src/bunny/lib/bunny/client.rb:301:in `start_session'
    from (irb):4
    from /home/justinf/.rvm/rubies/ruby-1.9.3-p0/bin/irb:16:in `<main>'

Exception raised when creating channel

With Bunny on latest master (#7e6b8a), when I run Bunny.new.create_channel the following exception is raised:

/usr/local/opt/rbenv/versions/1.9.3-p194/lib/ruby/gems/1.9.1/bundler/gems/bunny-7e6b8ab9e118/lib/bunny/session.rb:480:in `next_channel_id': undefined method `next_channel_id' for nil:NilClass (NoMethodError)
  from /usr/local/opt/rbenv/versions/1.9.3-p194/lib/ruby/gems/1.9.1/bundler/gems/bunny-7e6b8ab9e118/lib/bunny/channel.rb:156:in `initialize'
  from /usr/local/opt/rbenv/versions/1.9.3-p194/lib/ruby/gems/1.9.1/bundler/gems/bunny-7e6b8ab9e118/lib/bunny/session.rb:168:in `new'
  from /usr/local/opt/rbenv/versions/1.9.3-p194/lib/ruby/gems/1.9.1/bundler/gems/bunny-7e6b8ab9e118/lib/bunny/session.rb:168:in `create_channel'
  from run.rb:4:in `<main>'

Undefined method subscription

I found an error when starting some examples (Ruby 1.9.3-p125). I started in one terminal the simple_consumer.rb and in the other the simple_publisher.rb. The simple_consumer.rb returns some (in my opinion) strange debug data:

simple_consumer.rb

I, [2012-04-16 10:01:38#697]  INFO -- received: ?

?
 capabilitiesF  copyrightS$Copyright (C) 2007-2011 VMware, Inc.
                                                               informationS5Licensed under the MPL.  See http://www.rabbitmq.complatformS
Erlang/OTPproductRabbitMQversionS2.7.1PLAIN AMQPLAINen_US?
I, [2012-04-16 10:01:38#697]  INFO -- send: ?

platformSRubyproductSBunny
                          informationS!http://github.com/ruby-amqp/bunnyversionS0.7.AMQPLAIN#LOGINSguesPASSWORDSguesten_US?
I, [2012-04-16 10:01:38#697]  INFO -- received: 

?
I, [2012-04-16 10:01:38#697]  INFO -- send: 

?
I, [2012-04-16 10:01:38#697]  INFO -- send: 
(/?
I, [2012-04-16 10:01:38#697]  INFO -- received: 
)?
I, [2012-04-16 10:01:38#697]  INFO -- send: 
?
I, [2012-04-16 10:01:38#697]  INFO -- received: 
                                                ?
I, [2012-04-16 10:01:38#697]  INFO -- send: 

/data?
I, [2012-04-16 10:01:38#697]  INFO -- received: 
                                                ?
I, [2012-04-16 10:01:38#697]  INFO -- send: 2
po_box?
I, [2012-04-16 10:01:38#697]  INFO -- received: 2
                                                 po_box?
I, [2012-04-16 10:01:38#697]  INFO -- send: (

sorting_roomdirect?
I, [2012-04-16 10:01:38#697]  INFO -- received: (
                                                 ?
I, [2012-04-16 10:01:38#697]  INFO -- send: $2po_box
                                                    sorting_roomfred?
I, [2012-04-16 10:01:38#697]  INFO -- received: 2?
I, [2012-04-16 10:01:38#697]  INFO -- send: <po_botesttag1?
testtag1?04-16 10:01:38#697]  INFO -- received:

Also the simple_producer.rb returns some strange stuff:

I, [2012-04-16 10:02:32#700]  INFO -- received: ?

?
 capabilitiesF  copyrightS$Copyright (C) 2007-2011 VMware, Inc.
                                                               informationS5Licensed under the MPL.  See http://www.rabbitmq.complatformS
Erlang/OTPproductRabbitMQversionS2.7.1PLAIN AMQPLAINen_US?
I, [2012-04-16 10:02:32#700]  INFO -- send: ?

platformSRubyproductSBunny
                          informationS!http://github.com/ruby-amqp/bunnyversionS0.7.AMQPLAIN#LOGINSguesPASSWORDSguesten_US?
I, [2012-04-16 10:02:32#700]  INFO -- received: 

?
I, [2012-04-16 10:02:32#700]  INFO -- send: 

?
I, [2012-04-16 10:02:32#700]  INFO -- send: 
(/?
I, [2012-04-16 10:02:32#700]  INFO -- received: 
)?
I, [2012-04-16 10:02:32#700]  INFO -- send: 
?
I, [2012-04-16 10:02:32#700]  INFO -- received: 
                                                ?
I, [2012-04-16 10:02:32#700]  INFO -- send: 

/data?
I, [2012-04-16 10:02:32#700]  INFO -- received: 
                                                ?
I, [2012-04-16 10:02:32#700]  INFO -- send: (

sorting_roomdirect?
I, [2012-04-16 10:02:32#700]  INFO -- received: (
                                                 ?
I, [2012-04-16 10:02:32#700]  INFO -- send: <(
                                              sorting_roomfred?
I, [2012-04-16 10:02:32#700]  INFO -- send: )<,?application/octet-stream?
I, [2012-04-16 10:02:32#700]  INFO -- send: ,This is a message from the publisher?
I, [2012-04-16 10:02:32#700]  INFO -- send: (?bye?
I, [2012-04-16 10:02:32#700]  INFO -- received: )?
I, [2012-04-16 10:02:32#700]  INFO -- send: 
<?Goodbye?
I, [2012-04-16 10:02:32#700]  INFO -- received: 
=?

And if the simple_consumer.rb receives a message from the publisher, it fails with

simple_consumer.rb:51:in `block in <main>': undefined method `subscription' for #<Bunny::Queue:0x00000100877520> (NoMethodError)
    from /Users/23tux/.rvm/gems/ruby-1.9.3-p125/gems/bunny-0.7.9/lib/qrack/subscription.rb:81:in `call'
    from /Users/23tux/.rvm/gems/ruby-1.9.3-p125/gems/bunny-0.7.9/lib/qrack/subscription.rb:81:in `block in start'
    from /Users/23tux/.rvm/gems/ruby-1.9.3-p125/gems/bunny-0.7.9/lib/qrack/subscription.rb:56:in `loop'
    from /Users/23tux/.rvm/gems/ruby-1.9.3-p125/gems/bunny-0.7.9/lib/qrack/subscription.rb:56:in `start'
    from /Users/23tux/.rvm/gems/ruby-1.9.3-p125/gems/bunny-0.7.9/lib/bunny/queue08.rb:311:in `subscribe'
    from simple_consumer.rb:50:in `<main>'

Maybe someone can help,
thx

un-needed files in the gem

Hello:

I'm packaging this up as an RPM for Fedora (and in the future, EPEL) as a dependency for Chef, and during the packaging process I noticed that the gem has several unneeded files in it, namely the various dot-files.

Changing the gemspec to use
git ls-files *
instead of
git ls-files
(or manually adding some exemptions) should do the trick.

Additionally, the .gemspec is also packaged with the other files in the gem and could probably also be eliminated.

publishing empty message closes connection with RabbitMQ

I wrote a test which fails for me with RabbitMQ:

it "should be able to keep sending messages after an empty message" do
    q = @b.queue('test1')
    @default_exchange.publish('', :key => 'test1')
    @default_exchange.publish('Yet another test message', :key => 'test1')
    message_count(q).should == 2
    q.purge
  end

NoMethodError:
undefined method `message_count' for #Qrack::Protocol::Connection::Close:0x007fef4cae2e80

So it seems the connection is silently closed.
Not sure if this is by design, but maybe an exception should be raised.

possible frame parsing bug

We've noticed that in high-message-volume situations with multiple processes connecting to a RabbitMQ node via Bunny, Client#check_response is receiving unexpected values, e.g. plain integers, that cause failures when attempting to create a queue. In moderate volume situation we rarely see this problem, though we have found instance in our logs beginning with our first usage of bunny.

The code is pretty basic, start bunny with connection options, create a queue with no options, write to an exchange passing the created queue as the reply queue. I doubt there's anything special about our implementation as it works perfectly almost all the time, only failing when we run cron jobs that have multiple workers all sending/receiving messages via bunny for many minutes at a time.

It seems odd to me that Client#check_response would receive a value of 2 rather than the expected DeclareOK or some other standard AMQP response. If I'm wrong and our implementation could be to blame please let me know, but I suspect a frame parsing bug in Bunny.

Another detail: once it starts getting bogus values in Client#check_response, it continues to get bogus values, as though the buffer sequencing has gotten shifted causing all frames to be incorrectly parsed.

So, could this be a bug in Bunny? If so, what do you need from us to help reproduce it?

Thanks,
Eric

issues with tab completion in irb

This bug has been known for some time: http://bunnyamqp.wordpress.com/2009/08/24/bunny-ruby-irb-auto-completion-issue/

Unfortunately, patching IRB on every machine is not an option for us, nor do I think that it is a decent solution.

The issue is that Ruby (irb/completion) expects Module#name to return a string not a symbol.

ObjectSpace.each_object(Module){|m|
  begin
    name = m.name
  rescue Exception
    name = ""
  end
  next if name != "IRB::Context" and 
    /^(IRB|SLex|RubyLex|RubyToken)/ =~ name

While looping through the modules, we eventually come across a bunch of classes from Bunny / Qrack. Qrack::Protocol09::Connection#name for example returns :connection. I haven't really dug into the details of how this method is used, but I think it is probably a good idea to respect this contract that Module#name returns the actual name of the module, in this case "Qrack::Protocol09::Connection".

Thank you for your consideration.

Bunny doesn't heartbeat

There are no threads and no calls to send_heartbeat. Correct me if I'm wrong, but this means that enabling heartbeats will likely cause clients to drop more often than without it, especially if you're in a long running subscribe block. This seems like really common behavior to me (wanting to subscribe to a queue) and it seems silly to push the responsibility of responding to heartbeats down to consumers of this library.

Can't send HTTP requests ( 'em-http ) from within the queue block

I have a queue consumer which take msgs and send http request using EM-HTTP , It looks something like that :

require 'bunny'
require 'eventmachine'
require 'em-http'

amqp = Bunny.new(:logging => false, :host => $AMQP_URL)
amqp.start
q = amqp.queue('queue')
exch = amqp.exchange('queue')
q.bind(exch, :key => 'queue')

q.subscribe do |msg|
  res = msg[:payload]
  http = EventMachine::HttpRequest.new(URL).post :body => {:message =>  res.to_json }    
  http.errback  { puts http.error }
end

# Close client
amqp.stop

When I send a msg to the queue I get the following error :

/Users/info/.rvm/rubies/ruby-1.9.3-p194/lib/ruby/gems/1.9.1/gems/eventmachine-1.0.0/lib/eventmachine.rb:664:in `connect_server': eventmachine not initialized: evma_connect_to_server (RuntimeError)
from /Users/info/.rvm/rubies/ruby-1.9.3-p194/lib/ruby/gems/1.9.1/gems/eventmachine-1.0.0/lib/eventmachine.rb:664:in `bind_connect'
from /Users/info/.rvm/rubies/ruby-1.9.3-p194/lib/ruby/gems/1.9.1/gems/em-http-request-1.0.3/lib/em-http/http_connection.rb:54:in `activate_connection'
from /Users/info/.rvm/rubies/ruby-1.9.3-p194/lib/ruby/gems/1.9.1/gems/em-http-request-1.0.3/lib/em-http/http_connection.rb:89:in `setup_request'
from /Users/info/.rvm/rubies/ruby-1.9.3-p194/lib/ruby/gems/1.9.1/gems/em-http-request-1.0.3/lib/em-http/http_connection.rb:8:in `post'

Please note that I tried to put the whole block inside EM.run do block but didn't work as well

Any help would be highly appreciated

Flood publishing causes socket to be nil

There seems to be some kind of concurrency issues when I try to write a hell lot of messages locally. I've managed to reproduce the error by just running this code:

require 'bunny'
connection = Bunny.new({host: 'localhost', keepalive: true})

connection.start
channel = connection.create_channel
exchange = channel.topic("general", durable: true)

while true
  exchange.publish("hola", routing_key: 'resources', timestamp: Time.now.to_i, persistent: true)
end

after a while you will get this error:

undefined method `read_fully' for nil:NilClass
/Users/viki/.rvm/gems/ruby-1.9.3-p362@oceanus/gems/bunny-0.9.0.pre7/lib/bunny/transport.rb:112:in `read_next_frame'
/Users/viki/.rvm/gems/ruby-1.9.3-p362@oceanus/gems/bunny-0.9.0.pre7/lib/bunny/main_loop.rb:50:in `run_once'
/Users/viki/.rvm/gems/ruby-1.9.3-p362@oceanus/gems/bunny-0.9.0.pre7/lib/bunny/main_loop.rb:31:in `block in run_loop'
/Users/viki/.rvm/gems/ruby-1.9.3-p362@oceanus/gems/bunny-0.9.0.pre7/lib/bunny/main_loop.rb:28:in `loop'
/Users/viki/.rvm/gems/ruby-1.9.3-p362@oceanus/gems/bunny-0.9.0.pre7/lib/bunny/main_loop.rb:28:in `run_loop'
...

which comes continuously given that the Exception is swallowed in the main_loop

I've managed to avoid the error by setting single threaded true:

connection = Bunny.new({host: 'queue.dev.viki.io', keepalive: true, threaded: false})

~ Qrack::Queue#publish will be removed in Bunny 0.8. Deprecation Message.

I'm getting a deprecation message every time I try to call publish on a queue. Looking at the code, I'm supposed to be calling exchange.publish ? But exchange is a private method, so what am I supposed to do to queue items ?

b = Bunny.new
b.start
q = b.queue('test_queue')
q.exchange.publish('something')

~ Qrack::Queue#publish will be removed in Bunny 0.8.

I'm using Bunny 0.7

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.