ruby-amqp / bunny Goto Github PK
View Code? Open in Web Editor NEWBunny is a popular, easy to use, mature Ruby client for RabbitMQ
License: Other
Bunny is a popular, easy to use, mature Ruby client for RabbitMQ
License: Other
Hi there,
I'm using Bunny within Celluloid to build a small framework for handling messages between Rails apps. I'm still playing around at the moment, trying to wrap my head around both components.
I'm having trouble acknowledging a message inside a Celluloid Actor - the queue subscribes fine and I receive messages through it, but as soon as I attempt to acknowledge one of those messages, I get the following exception + stack trace: https://gist.github.com/76bbeb6688f27ef2e47e#file-gistfile1-txt
The full code is available in that gist too.
The @channel
method is called from the same thread that created the channel, not from the thread in the subscribe block. I'd have thought this would be OK, but maybe I'm doing something stupid.
Hi there,
is there any API can get server version from bunny or AMQP client?
Thanks.
Although Bunny imposes a timeout on the TCPSocket connect operation, there is no timeout on the SSLSocket connect. This is true for bunny 0.6.x - 0.8.x and also seems to be true on master.
From my reading of the C code, SSLSocket#connect calls through into native C code which performs an SSL connection handshake. In some cases -- e.g. quiescent server, malicious server, or RabbitMQ server that is out of file descriptors -- the handshake will never complete, which will cause Bunny to hang more-or-less indefinitely.
Relevant line of code:
https://github.com/ruby-amqp/bunny/blob/0.8.x-stable/lib/qrack/client.rb#L220
It seems to me that the timeout block should apply to basically the entire method. I have verified that this fixes the problem.
I've got a consumer that uses Queue#pop method. After broker is restarted or closed, insane amounts of these error messages are output:
NoMethodError
undefined method `read_fully' for nil:NilClass
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/bundler/gems/bunny-6fae2db09c0d/lib/bunny/transport.rb:109:in `read_next_frame'
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/bundler/gems/bunny-6fae2db09c0d/lib/bunny/main_loop.rb:26:in `block in run_loop'
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/bundler/gems/bunny-6fae2db09c0d/lib/bunny/main_loop.rb:22:in `loop'
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/bundler/gems/bunny-6fae2db09c0d/lib/bunny/main_loop.rb:22:in `run_loop'
AMQ::Protocol::EmptyResponseError
Empty response received from the server.
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/gems/amq-protocol-1.0.1/lib/amq/protocol/frame.rb:45:in `decode_header'
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/bundler/gems/bunny-6fae2db09c0d/lib/bunny/transport.rb:110:in `read_next_frame'
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/bundler/gems/bunny-6fae2db09c0d/lib/bunny/main_loop.rb:26:in `block in run_loop'
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/bundler/gems/bunny-6fae2db09c0d/lib/bunny/main_loop.rb:22:in `loop'
/Users/mark/.rvm/gems/ruby-1.9.3-p327@forum-corpus/bundler/gems/bunny-6fae2db09c0d/lib/bunny/main_loop.rb:22:in `run_loop'
I assume it happens from here: https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/main_loop.rb#L54
On our servers it kept outputting until we were out of disk space :(
This problem is actually two-fold:
Having all these bloody _08.rb
files is a mess. Also we're about to port Bunny on AMQ protocol gem and it doesn't support AMQP 0.8 anyways.
# encoding: utf-8
If I start worker.rb and then publish messages from task.rb, I see that messages are consumed. If I reverse the order in which things are done - publish from task.rb and then start worker.rb - no messages are consumed. My expected result is that messages should be consumed regardless of the order in which publishing and consumption are done.
The problem seems to be that if a consumer is registered on a non-empty queue, Bunny cannot handle the message that it receives from the server. The message contains a consume-ok and delivery frameset for a published message, but it is treated just like a single consume-ok. Publishing more messages to the queue has no effect.
I'm using the latest code in ruby-amqp/bunny master.
The scripts are as follows -
#!/usr/bin/env ruby require 'bunny' connection = Bunny.new connection.start channel = connection.channel() q = channel.queue('task_queue', :durable => true) puts ' [*] Waiting for messages. To exit press CTRL+C' channel.prefetch(prefetch_count=1) q.subscribe(:block => true, :ack => true) do |delivery_info, properties, payload| puts " [x] Received #{payload}" puts " [x] Done" channel.ack(delivery_info.delivery_tag, false) end
#!/usr/bin/env ruby require 'bunny' connection = Bunny.new connection.start channel = connection.channel() channel.queue('task_queue', :durable => true) exchange = channel.default_exchange message = ' ' + gets.chomp exchange.publish(message, :routing_key => 'task_queue') puts " [x] Sent #{message}" connection.close()
In 0.9.0.pre4, if a channel level error occurs and you re-open that channel, the previous channel error is re-raised even if the operation succeeds.
client = Bunny.new
channel = client.channel
# try to access a queue that doesn't exist
begin
channel.queue("does.not.exist", :passive => true)
rescue Bunny::NotFound
puts "got expected exception"
end
# reopen the channel
channel.open
channel.queue("my.queue") #raises Bunny::NotFound for queue "does.not.exist"
Running the following code:
require 'bunny'
connection = Bunny.new({host: 'localhost', keepalive: true, threaded: false}})
connection.start
channel = connection.create_channel
exchange = channel.topic("general", durable: true)
while true
exchange.publish("hola", routing_key: 'resources', timestamp: Time.now.to_i, persistent: true)
end
I get a message throughput of about 1500 messages/sec in my local machine.
If I use these options:
Bunny.new({host: 'localhost', keepalive: true, threaded: false, socket_timeout: 0, connect_timeout: 0})
The message throughput goes up to 6000 messages/sec.
I believe this is mainly due to Timeout calls spawning new threads.
If i let a bunny connection sitting idle in an irb session it will eventually start throwing this endlessly:
AMQ::Protocol::EmptyResponseError
Empty response received from the server.
/Users/carl/.rvm/gems/ruby-1.9.3-p194/gems/amq-protocol-1.0.1/lib/amq/protocol/frame.rb:45:in `decode_header'
/Users/carl/.rvm/gems/ruby-1.9.3-p194/gems/bunny-0.9.0.pre3/lib/bunny/transport.rb:110:in `read_next_frame'
/Users/carl/.rvm/gems/ruby-1.9.3-p194/gems/bunny-0.9.0.pre3/lib/bunny/main_loop.rb:26:in `block in run_loop'
/Users/carl/.rvm/gems/ruby-1.9.3-p194/gems/bunny-0.9.0.pre3/lib/bunny/main_loop.rb:22:in `loop'
/Users/carl/.rvm/gems/ruby-1.9.3-p194/gems/bunny-0.9.0.pre3/lib/bunny/main_loop.rb:22:in `run_loop'
Server is RabbitMQ 3.0.0, so maybe something with the heartbeat/connection timeout?
The following error occurs when I try to connect using :ssl => true -
NoMethodError: undefined method read_fully' for #<OpenSSL::SSL::SSLSocket:0x9f4a854> from /bunny-0.9.0.pre4/lib/bunny/transport.rb:116:in
read_next_frame'
I know that some work is going on around the transport area, so this issue can wait until that work can be addressed.
The following hello-world style test cases were run in the same environment:
require 'rubygems'
require 'bunny'
b = Bunny.new("amqps://marysue")
b.start
q = b.queue("ssl_test")
e = b.exchange('')
e.publish("Bunny", :key => "ssl_test")
message = q.pop
puts "Received a message: #{message[:payload]}. Disconnecting..."
b.stop
and
require "rubygems"
require "amqp"
EventMachine.run do
connection = AMQP.connect('amqps://marysue')
channel = AMQP::Channel.new(connection)
queue = channel.queue("ssl_test")
exchange = channel.default_exchange
exchange.publish "ruby-amqp", :routing_key => queue.name
queue.subscribe do |payload|
puts "Received a message: #{payload}. Disconnecting..."
connection.close {
EventMachine.stop { exit }
}
end
end
The bunny example fails with the following exception:
bunny-0.8.0.pre1/lib/bunny/client.rb:165:in
open_connection': Connection failed - user: guest (Bunny::ProtocolError)`
Which I believe is actually a timeout error, since program freezes for several seconds before finally crashing. The program completes successfully if :ssl => true is removed from the connection line.
The following is the rabbitmq server log pertaining to the failing bunny connection:
=INFO REPORT==== 28-Feb-2012::11:49:57 ===
accepted TCP connection on [::]:5671 from 127.0.0.1:50850
=INFO REPORT==== 28-Feb-2012::11:49:57 ===
starting TCP connection <0.457.0> from 127.0.0.1:50850
=INFO REPORT==== 28-Feb-2012::11:49:57 ===
upgraded TCP connection <0.457.0> to SSL
=ERROR REPORT==== 28-Feb-2012::11:50:07 ===
exception on TCP connection <0.457.0> from 127.0.0.1:50850
{handshake_timeout,frame_header}
=INFO REPORT==== 28-Feb-2012::11:50:07 ===
closing TCP connection <0.457.0> from 127.0.0.1:50850
In comparison, here is the log entry from the successful amqp gem
=INFO REPORT==== 28-Feb-2012::11:51:31 ===
accepted TCP connection on [::]:5671 from 127.0.0.1:50891
=INFO REPORT==== 28-Feb-2012::11:51:31 ===
starting TCP connection <0.319.0> from 127.0.0.1:50891
=INFO REPORT==== 28-Feb-2012::11:51:31 ===
upgraded TCP connection <0.319.0> to SSL
=INFO REPORT==== 28-Feb-2012::11:51:31 ===
closing TCP connection <0.319.0> from 127.0.0.1:50891
For reference, marysue is simply the hostname of my local computer, for which I generated the certificate. The address resolves to 127.0.0.1
Then check:
Michael:
"Bunny does not have any documentation guides. The same approach with guides (store them under docs/*, use a DocumentationIndex page for TOC) worked well 3 times already (for amqp, eventmachine and travis-worker). This is also how rubydoc.info expects people to do things (in part because they only let projects use subset of YARD features).
I think amqp gem guides structure can be taken as is, content can be copied where necessary or written anew. I haven't see any attempts for projects to share documentation (code examples, obviously, cannot be shared at all), but having the same set of guides and TOC ordering is certainly possible."
I don't know if it has been fixed in master, but with bunny 0.7.9, nothing happens (using RabbitMQ as a broker, no message is received) when publishing a message that has a symbol value in the AMQP header :headers.
OK
exchange.publish('AA'), {
:type => type,
:key => key,
:correlation_id => correlation_id,
:reply_to => reply_to,
:headers => { :xx => 'aa' }
:content_type => 'application/json',
:timestamp => Time.now.to_i
})
Fails (no error triggered)
exchange.publish('AA'), {
:type => type,
:key => key,
:correlation_id => correlation_id,
:reply_to => reply_to,
:headers => { :xx => 'aa'.to_sym }
:content_type => 'application/json',
:timestamp => Time.now.to_i
})
I use the Tracer tool ( http://www.rabbitmq.com/examples.html#tracer ) to debug my jruby and 1.9.2 applications. I recently added bunny to a rails app to (only) publish to a RabbitMq exchange. With the tracer in between RabbitMq server and the ruby app, I see a java exception in the Tracer on the Bunny connection. Could the protocol be the problem ( first line )? The Java line is ch#0 <- {#method<connection.start>(version-major=0, version-minor=9 ...
Env: Rails App - Rails 3.1.1, Bunny 0.7.8, ruby 1.9.3pre1, RabbitMq 2.6.1, Tracer 2.6.1, thin
Background App - jRuby 1.6.4 Java 7, jMongo 1.1.4 (-> RabbitMq Java Driver, 2.6.5).
Added below is the relevant part of the Tracer output.
is the jRuby side
is the MRI Bunny side
It looks like the connection opens OK but the channel is not opened.
1319551245765: ch#3 -> {#method<tx.commit>(),null,""}
1319551245799: ch#3 <- {#method<tx.commit-ok>(),null,""}
1319551245830: ch#0 <- {#method<connection.start>(version-major=8, version-minor=0, server-properties={product=RabbitMQ, information=Licensed under the MPL. See http://www.rabbitmq.com/, platform=Erlang/OTP, capabilities={}, copyright=Copyright (C) 2007-2011 VMware, Inc., version=2.6.1}, mechanisms=PLAIN AMQPLAIN, locales=en_US),null,""}
1319551245831: ch#0 -> {#method<connection.start-ok>(client-properties={product=Bunny, platform=Ruby, information=http://github.com/ruby-amqp/bunny, version=0.7.8}, mechanism=AMQPL****PASSWORD****, locale=en_US),null,""}
1319551245870: ch#0 <- {#method<connection.tune>(channel-max=0, frame-max=131072, heartbeat=0),null,""}
1319551245910: ch#0 -> {#method<connection.tune-ok>(channel-max=0, frame-max=131072, heartbeat=0),null,""}
1319551245910: ch#0 -> {#method<connection.open>(virtual-host=/user-test, capabilities=, insist=false),null,""}
1319551245920: ch#5 -> {#method<basic.get>(ticket=0, queue=message_failure, no-ack=false),null,""}
1319551245950: ch#0 <- {#method<connection.open-ok>(known-hosts=),null,""}
1319551245960: ch#5 <- {#method<basic.get-empty>(cluster-id=),null,""}
1319551245960: ch#5 -> {#method<basic.get>(ticket=0, queue=user, no-ack=false),null,""}
1319551245990: ch#1 -> {#method<channel.open>(out-of-band=),null,""}
1319551245999: ch#5 <- {#method<basic.get-empty>(cluster-id=),null,""}
1319551246033: uncaught java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at com.rabbitmq.client.impl.ValueReader.readBytes(ValueReader.java:86)
at com.rabbitmq.client.impl.ValueReader.readLongstr(ValueReader.java:112)
at com.rabbitmq.client.impl.ValueReader.readLongstr(ValueReader.java:120)
at com.rabbitmq.client.impl.MethodArgumentReader.readLongstr(MethodArgumentReader.java:73)
at com.rabbitmq.client.impl.AMQImpl$Channel$OpenOk.(AMQImpl.java:600)
at com.rabbitmq.client.impl.AMQImpl.readMethodFrom(AMQImpl.java:3242)
at com.rabbitmq.client.impl.AMQCommand$Assembler.handleFrame(AMQCommand.java:266)
at com.rabbitmq.tools.Tracer$DirectionHandler.doFrame(Tracer.java:331)
at com.rabbitmq.tools.Tracer$DirectionHandler.run(Tracer.java:345)
at java.lang.Thread.run(Thread.java:722)
queue.message_count under queue.subscribe bracket is unstable,
When queue.subscribe called, sometimes it will report error. and quit.
env = (rvm)ruby 1.9.3-p0 + bunny-0.7.9 + ubuntu 12.04 64bit.
error message is following:
someone@c2h2xu1204:/home/c2h2/Desktop# ruby bb.rb
/usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/bunny/queue08.rb:303:in `status': undefined method `message_count' for <<sometag๏ฟฝ๏ฟฝasdf:Qrack::Protocol::Basic::Deliver (NoMethodError)
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/queue.rb:24:in `message_count'
from bb.rb:17:in `block in <main>'
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/subscription.rb:81:in `call'
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/subscription.rb:81:in `block in start'
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/subscription.rb:56:in `loop'
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/subscription.rb:56:in `start'
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/bunny/queue08.rb:311:in `subscribe'
from bb.rb:16:in `<main>'
sample code provided:
aa.rb as the emitter https://gist.github.com/2692967
bb.rb as the receiver https://gist.github.com/2692974
please run aa.rb several times and bb.rb for several times and error always happens on me for the second call on bb.rb.
Hey everybody.
I've uncovered a rare issue caused when subscribing with a timeout. Every so often, Bunny will just freeze and never exit. The theory is that Bunny is in the processing of receiving a message when the timeout is hit (very small window), causing the follow-up unsubscribe to fail.
Here's the stack-trace when interrupting a frozen process:
/Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/client.rb:194:in `read': Interrupt
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/client.rb:194:in `send_command'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/client.rb:124:in `read'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:272:in `_read'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:103:in `block in read'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:89:in `map'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:89:in `read'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/transport/frame08.rb:61:in `block in parse'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:263:in `extract'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/transport/frame08.rb:60:in `parse'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/bunny/client08.rb:158:in `next_frame'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/client.rb:117:in `next_payload'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/bunny/queue08.rb:343:in `unsubscribe'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/subscription.rb:61:in `rescue in block in start'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/subscription.rb:58:in `block in start'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/subscription.rb:56:in `loop'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/subscription.rb:56:in `start'
from /Users/lcurley/.rbenv/versions/1.9.3-p0/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/bunny/queue08.rb:311:in `subscribe'
from bunny.rb:39:in `block in <main>'
from bunny.rb:34:in `times'
from bunny.rb:34:in `<main>'
I was able to (randomly) reproduce the issue with this simple script:
require 'bunny'
bunny_write = Bunny.new#(:logging => true)
bunny_read = Bunny.new(:logging => true)
bunny_write.start
bunny_read.start
bunny_read.qos
direct_exchange = bunny_write.exchange('')
queue_read = bunny_read.queue('test1')
queue_write = bunny_write.queue('test1')
queue_read.purge # clear messages from previous run
NUM_MESSAGES = 1000
Thread.new do
NUM_MESSAGES.times do |i|
# publish a message to the queue
direct_exchange.publish("#{ i }", key: queue_write.name)
puts "sent #{ i }"
sleep rand
end
bunny_write.stop
end
NUM_MESSAGES.times do |i|
timeout = rand
got_message = false
# get message from the queue
queue_read.subscribe(:timeout => timeout, :message_max => 1) do |message|
got_message = true
end
puts "#{ got_message ? "recieved" : "missed" } #{ i }"
end
bunny_read.stop
bunny_write.stop
Here's the last few lines of output from this script. Keep in mind that "sent" and "received" lines aren't meant to match up.
recieved 644
I, [2012-02-18 17:35:28#94063] INFO -- send: ^A^@^A^@^@^@E^@<^@^T^@^A^Etest1729271
I, [2012-02-18 17:35:28#94063] INFO -- received: ^A^@^A^@^@^@<^@<^@^U7292712987322
sent 355
I, [2012-02-18 17:35:29#94063] INFO -- received: ^A^@^A^@^@^@L^@<^@<72927129873223
I, [2012-02-18 17:35:29#94063] INFO -- received: ^B^@^A^@^@^@)^@<^@^@^@^@^@^@^@^@^
I, [2012-02-18 17:35:29#94063] INFO -- received: ^C^@^A^@^@^@^C355ร
I, [2012-02-18 17:35:29#94063] INFO -- send: ^A^@^A^@^@^@=^@<^@^^72927129873223131
I, [2012-02-18 17:35:29#94063] INFO -- received: ^A^@^A^@^@^@<^@<^@^_7292712987322
recieved 645
I, [2012-02-18 17:35:29#94063] INFO -- send: ^A^@^A^@^@^@E^@<^@^T^@^A^Etest1797302
I, [2012-02-18 17:35:29#94063] INFO -- received: ^A^@^A^@^@^@<^@<^@^U7973022136281
sent 356
I, [2012-02-18 17:35:29#94063] INFO -- send: ^A^@^A^@^@^@=^@<^@^^79730221362814317
sent 357
sent 358
sent 359
...
The obvious solution is to set a socket timeout when creating the client, but it has to be greater than maximum timeout amount. Maybe there is a better fix avaliable.
Thanks!
amqp 0.8.0.RC14 no longer has "one consumer per queue on a channel" limitation. Because load-balancing happens between consumers, it is annoying, even though easy to work around. It potentially may even be misleading.
So while I kept AMQP::Queue#subscribe and it still only can be used once per queue, it internally sets up an AMQP::Consumer instance and those can be instantiated, activated, cancelled or passed around like any other objects. Bunny should do the same. It is great to have Queue#subscribe, but there has to be a way to add more consumers when necessary, even if with a few more lines.
We need to get rid of copied AMQ::Client code (renaming it would be sufficient), ideally it should be extracted. Most of the bits are already in amq-protocol, we need to make sure AMQP URI parsing is moved there, too.
This is the same code I used:
require 'rubygems'
require 'bunny'
b = Bunny.new(:host => '127.0.0.1', :spec => '09', :logging => true)
b.start
exchange = b.exchange('')
q = b.queue(nil, :exclusive => false)
exchange.publish('abc', :key => q.name)
exchange.publish('abc', :key => q.name)
puts '.'
q.subscribe( :timeout => 5, :message_max => 1 ) do |msg|
puts msg[:payload]
end
puts '.'
The exception I receive:
/Users/mike/.rvm/gems/ruby-1.8.7-p334/gems/bunny-0.7.6/lib/bunny/client09.rb:55:in `check_response': Error unsubscribing from queue amq.gen-5883aQtSztoCgUHv1guX9g== (Bunny::ProtocolError)
from /Users/mike/.rvm/gems/ruby-1.8.7-p334/gems/bunny-0.7.6/lib/bunny/queue09.rb:308:in `unsubscribe'
from /Users/mike/.rvm/gems/ruby-1.8.7-p334/gems/bunny-0.7.6/lib/qrack/subscription.rb:86:in `consume'
from /Users/mike/.rvm/gems/ruby-1.8.7-p334/gems/bunny-0.7.6/lib/qrack/subscription.rb:56:in `loop'
from /Users/mike/.rvm/gems/ruby-1.8.7-p334/gems/bunny-0.7.6/lib/qrack/subscription.rb:56:in `consume'
from /Users/mike/.rvm/gems/ruby-1.8.7-p334/gems/bunny-0.7.6/lib/bunny/queue09.rb:248:in `subscribe'
from test.rb:17
And here's the raw log:
I, [2011-10-09 01:12:14#15961] INFO -- received: #<Qrack::Transport09::Method:0x100625d58 @payload=#<Qrack::Protocol09::Connection::Start:0x1006257b8 @server_properties={:version=>"2.6.1", :information=>"Licensed under the MPL. See http://www.rabbitmq.com/", :copyright=>"Copyright (C) 2007-2011 VMware, Inc.", :capabilities=>{:consumer_cancel_notify=>1, :"basic.nack"=>1, :publisher_confirms=>1, :exchange_exchange_bindings=>1}, :platform=>"Erlang/OTP", :product=>"RabbitMQ"}, @version_minor=9, @locales="en_US", @version_major=0, @mechanisms="PLAIN AMQPLAIN">, @channel=0>
I, [2011-10-09 01:12:14#15961] INFO -- send: #<Qrack::Transport09::Method:0x100611b00 @payload=#<Qrack::Protocol09::Connection::StartOk:0x100612208 @mechanism="PLAIN", @client_properties={:version=>"0.7.6", :information=>"http://github.com/ruby-amqp/bunny", :platform=>"Ruby", :product=>"Bunny"}, @locale="en_US", @response="\000guest\000guest">, @channel=0>
I, [2011-10-09 01:12:14#15961] INFO -- received: #<Qrack::Transport09::Method:0x10060bf48 @payload=#<Qrack::Protocol09::Connection::Tune:0x10060bc50 @channel_max=0, @heartbeat=0, @frame_max=131072>, @channel=0>
I, [2011-10-09 01:12:14#15961] INFO -- send: #<Qrack::Transport09::Method:0x1006094f0 @payload=#<Qrack::Protocol09::Connection::TuneOk:0x10060d6b8 @channel_max=0, @heartbeat=0, @frame_max=131072>, @channel=0>
I, [2011-10-09 01:12:14#15961] INFO -- send: #<Qrack::Transport09::Method:0x100606d18 @payload=#<Qrack::Protocol09::Connection::Open:0x1006074e8 @virtual_host="/", @deprecated_insist=nil, @deprecated_capabilities=nil>, @channel=0>
I, [2011-10-09 01:12:14#15961] INFO -- received: #<Qrack::Transport09::Method:0x1005fdc68 @payload=#<Qrack::Protocol09::Connection::OpenOk:0x1005fd768 @deprecated_known_hosts="">, @channel=0>
I, [2011-10-09 01:12:14#15961] INFO -- send: #<Qrack::Transport09::Method:0x1005f9258 @payload=#<Qrack::Protocol09::Channel::Open:0x1005ffe28 @deprecated_out_of_band=nil>, @channel=1>
I, [2011-10-09 01:12:14#15961] INFO -- received: #<Qrack::Transport09::Method:0x1005f3e70 @payload=#<Qrack::Protocol09::Channel::OpenOk:0x1005f3038 @deprecated_channel_id="">, @channel=1>
I, [2011-10-09 01:12:14#15961] INFO -- send: #<Qrack::Transport09::Method:0x1005f0f18 @payload=#<Qrack::Protocol09::Queue::Declare:0x1005f1e40 @queue="", @deprecated_ticket=0, @durable=nil, @auto_delete=true, @nowait=nil, @passive=nil, @exclusive=nil, @arguments=nil>, @channel=1>
I, [2011-10-09 01:12:14#15961] INFO -- received: #<Qrack::Transport09::Method:0x1005eb680 @payload=#<Qrack::Protocol09::Queue::DeclareOk:0x1005eb450 @queue="amq.gen-5883aQtSztoCgUHv1guX9g==", @message_count=0, @consumer_count=0>, @channel=1>
I, [2011-10-09 01:12:14#15961] INFO -- send: #<Qrack::Transport09::Method:0x1005e9308 @payload=#<Qrack::Protocol09::Basic::Publish:0x1005e9ad8 @deprecated_ticket=0, @immediate=nil, @mandatory=nil, @routing_key="amq.gen-5883aQtSztoCgUHv1guX9g==", @exchange="">, @channel=1>
I, [2011-10-09 01:12:14#15961] INFO -- send: #<Qrack::Transport09::Header:0x1005e7710 @payload=#<Qrack::Protocol09::Header:0x1005e9740 @size=3, @klass=Qrack::Protocol09::Basic, @weight=0, @properties={:content_type=>"application/octet-stream", :delivery_mode=>1, :priority=>0}>, @channel=1>
I, [2011-10-09 01:12:14#15961] INFO -- send: #<Qrack::Transport09::Body:0x1005e9678 @payload="abc", @channel=1>
I, [2011-10-09 01:12:14#15961] INFO -- send: #<Qrack::Transport09::Method:0x1005d97a0 @payload=#<Qrack::Protocol09::Basic::Publish:0x1005de048 @deprecated_ticket=0, @immediate=nil, @mandatory=nil, @routing_key="amq.gen-5883aQtSztoCgUHv1guX9g==", @exchange="">, @channel=1>
I, [2011-10-09 01:12:14#15961] INFO -- send: #<Qrack::Transport09::Header:0x1005d58d0 @payload=#<Qrack::Protocol09::Header:0x1005dbb40 @size=3, @klass=Qrack::Protocol09::Basic, @weight=0, @properties={:content_type=>"application/octet-stream", :delivery_mode=>1, :priority=>0}>, @channel=1>
I, [2011-10-09 01:12:14#15961] INFO -- send: #<Qrack::Transport09::Body:0x1005db898 @payload="abc", @channel=1>
.
I, [2011-10-09 01:12:14#15961] INFO -- send: #<Qrack::Transport09::Method:0x1005d0c90 @payload=#<Qrack::Protocol09::Basic::Consume:0x1005d13c0 @queue="amq.gen-5883aQtSztoCgUHv1guX9g==", @no_ack=true, @deprecated_ticket=0, @no_local=nil, @nowait=nil, @exclusive=nil, @consumer_tag="2722213142623242820131851025322121916153119294171130786", @filter=nil>, @channel=1>
I, [2011-10-09 01:12:14#15961] INFO -- received: #<Qrack::Transport09::Method:0x1005cbbc8 @payload=#<Qrack::Protocol09::Basic::ConsumeOk:0x1005cb718 @consumer_tag="2722213142623242820131851025322121916153119294171130786">, @channel=1>
I, [2011-10-09 01:12:14#15961] INFO -- received: #<Qrack::Transport09::Method:0x1005c6600 @payload=#<Qrack::Protocol09::Basic::Deliver:0x1005c5d40 @delivery_tag=1, @routing_key="amq.gen-5883aQtSztoCgUHv1guX9g==", @consumer_tag="2722213142623242820131851025322121916153119294171130786", @redelivered=false, @exchange="">, @channel=1>
I, [2011-10-09 01:12:14#15961] INFO -- received: #<Qrack::Transport09::Header:0x1005c0638 @payload=#<Qrack::Protocol09::Header:0x1005c0368 @size=3, @klass=Qrack::Protocol09::Basic, @weight=0, @properties={:content_type=>"application/octet-stream", :delivery_mode=>1, :priority=>0}>, @channel=1>
I, [2011-10-09 01:12:14#15961] INFO -- received: #<Qrack::Transport09::Body:0x1005b94f0 @payload="abc", @channel=1>
abc
I, [2011-10-09 01:12:14#15961] INFO -- send: #<Qrack::Transport09::Method:0x1005b8438 @payload=#<Qrack::Protocol09::Basic::Cancel:0x1005b8c58 @nowait=nil, @consumer_tag="2722213142623242820131851025322121916153119294171130786">, @channel=1>
I, [2011-10-09 01:12:14#15961] INFO -- received: #<Qrack::Transport09::Method:0x1005b2df8 @payload=#<Qrack::Protocol09::Basic::Deliver:0x1005b2808 @delivery_tag=2, @routing_key="amq.gen-5883aQtSztoCgUHv1guX9g==", @consumer_tag="2722213142623242820131851025322121916153119294171130786", @redelivered=false, @exchange="">, @channel=1>
NoMethodError: undefined method `payload' for nil:NilClass
next_payload at /home/petef/.rvm/gems/jruby-1.6.0/gems/bunny-0.6.0/lib/qrack/client.rb:84
open_connection at /home/petef/.rvm/gems/jruby-1.6.0/gems/bunny-0.6.0/lib/bunny/client08.rb:210
start_session at /home/petef/.rvm/gems/jruby-1.6.0/gems/bunny-0.6.0/lib/bunny/client08.rb:397
loop at org/jruby/RubyKernel.java:
start_session at /home/petef/.rvm/gems/jruby-1.6.0/gems/bunny-0.6.0/lib/bunny/client08.rb:389
(root) at bunny-bug.rb:7
This happens when you try connecting with a valid username/password, but one that has no permissions on the vhost in question. To reproduce, here's the code:
b = Bunny.new({:vhost => "/test", :user => "guest", :pass => "guest"})
b.start
e = b.exchange("rawlogs", :type => "direct")
and assuming rabbitmq, you'll need to "rabbitmqctl add_vhost /test" to create the vhost. By default, the "guest" user will not have access to create exchanges under /test.
Expected behavior: a Bunny:: exception with a better error message
Hello,
I added EventMachine+Fibers support: cjbottaro@68824d7
A part of that process was factoring out the idea of a "connection". You can see from the commit that @socket is replaced with a "connection". As far as I can tell, very little of the socket interface is used and I formalized what is used in Qrack::Connection::AbstractBase. Then I created two connections types that inherit from AbstractBase: Socket (the default) and FiberedEm.
Which connection type to use can be specified in Bunny#new, but if none is specified, it tries to dynamically figure out which to use (See Qrack::Client#default_connection_type).
All of Bunny's tests still pass using the default socket connection type. I've tested the :fibered_em connection type against a live RabbitMQ server (a basic subset of operations) and it seems to work ok. I have not written any specs / unit tests though.
I want to send a pull request, but would like yall to review the changes first and am curious for any ideas about how to write specs / unit tests.
Usage of Bunny + EM/Fibers is as follows:
require "bunny"
require "eventmachine"
require "fiber"
EM.run do
Fiber.new do
fm = Fiber.current
done_count = 0
f1 = Fiber.new do
b = Bunny.new :connection_type => :fibered_em
puts "start1"
b.start
puts "stop1"
b.stop
done_count += 1
fm.resume
end
f2 = Fiber.new do
b = Bunny.new :connection_type => :fibered_em
puts "start2"
b.start
puts "stop2"
b.stop
done_count += 1
fm.resume
end
f1.resume
f2.resume
# Kinda like joining threads.
Fiber.yield while done_count < 2
EM.stop
end.resume
end
Note that specifying the :connection_type
is unnecessary in that it should figure out that it's running in an EM+Fibers environment.
Thanks,
-- C
I'm using ruby 1.9.2p290 with Bunny 0.7.9.
This is my sample code:
require 'bunny'
STDOUT.sync = true
client = Bunny.new(ENV["RABBITMQ_URL"])
client.start
e = client.exchange('data', type: :topic, durable: true)
q = Bunny::Queue.new(client, 'simple_persister', durable: true)
q.bind(e, key: 'hello.*')
2.times do
e.publish("hello", key: "hello.world")
end
q.subscribe(ack: true) do |msg|
p [:debug, q.status]
end
Upon execution, I see:
/Users/mgorsuch/.rbenv/versions/1.9.2-p290/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/bunny/queue08.rb:303:in `status': undefined method `message_count' for #<Qrack::Protocol::Basic::Deliver:0x007fc5db93a848> (NoMethodError)
from pub.rb:17:in `block in <main>'
from /Users/mgorsuch/.rbenv/versions/1.9.2-p290/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/subscription.rb:81:in `call'
from /Users/mgorsuch/.rbenv/versions/1.9.2-p290/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/subscription.rb:81:in `block in start'
from /Users/mgorsuch/.rbenv/versions/1.9.2-p290/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/subscription.rb:56:in `loop'
from /Users/mgorsuch/.rbenv/versions/1.9.2-p290/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/qrack/subscription.rb:56:in `start'
from /Users/mgorsuch/.rbenv/versions/1.9.2-p290/lib/ruby/gems/1.9.1/gems/bunny-0.7.9/lib/bunny/queue08.rb:311:in `subscribe'
from pub.rb:16:in `<main>'
If I change 2.times
to 1.times
, it works as expected.
Am I doing it wrong?
bunny seems to hide many options and decide on default options that you can't override. For example, publishing a message to an exchange.
Here are all the underlying options as defined in lib/qrack/protocol/spec.rb:
shortstr :content_type
shortstr :content_encoding
table :headers
octet :delivery_mode
octet :priority
shortstr :correlation_id
shortstr :reply_to
shortstr :expiration
shortstr :message_id
timestamp :timestamp
shortstr :type
shortstr :user_id
shortstr :app_id
shortstr :deprecated_cluster_id
But in lib/bunny/exchange.rb publish only has a limited subset of these options (content_type, and delivery_mode) and just decides with no possibility of overriding that you want the priority to be 0. This is making it impossible (without modification) for use in my project as I need access to some of these properties that are just not accessible.
Can we please get more options exposed?
Thanks :)
I am trying to write to exchange in passive mode with user who have only write permission to rabbitmq-server.
As far as I understood from documentation, it should be possible, using passive exchange declaration:
sudo rabbitmqctl set_permissions -p /x user '' 'my_exchange' ''
then test.rb:
b Bunny.new(:host => 'localhost',:vhost=> "/x", :port => 5672,:user=>"user",:pass=> "123",:logging=>true )
b.start
e = b.exchange("my_exchange", :passive => true)
I checked my_exchange already exists, log is following:
I, [2012-02-13 12:28:11#11644] INFO -- send: #<Qrack::Transport::Method:0x7f1804d7f348 @channel=1, @payload=# <Qrack::Protocol::Exchange::Declare:0x7f1804d7f848 @durable=nil, @passive=true, @nowait=nil, @exchange="my_exchange", @internal=nil, @type=:topic, @ticket=1, @arguments=nil, @auto_delete=nil>>
I, [2012-02-13 12:28:11#11644] INFO -- received: #<Qrack::Transport::Method:0x7f1804d7d980 @channel=1, @payload=#<Qrack::Protocol::Channel::Close:0x7f1804d7d818 @class_id=40, @reply_text="ACCESS_REFUSED - access to exchange 'my_exchange' in vhost '/x' refused for user 'user'", @reply_code=403, @method_id=10>>
Why would that be? plus, it seems bunny is sending correct info. So I am not sure if this is bunny bug, it seems to me that it's not likely that rabbitmq-server would have such bug, I am using rabbitmq-server 2.5.1 and newest bunny 0.7.9.
If I give config access, then it works, but then what was the point of passive declaration.
I'm using ruby 1.9.3p0 with bunny 0.79, RabbitMQ
From what I understand, this may be a problem due to bunny's synchronous nature.
Anyways: While subscribing to a queue, I cannot change the bindings keys on that queue.
Is there anyway to solve this? Or is a fix planned in the future?
Setup: bind_test_publish.rb sends 2 messages each second, to topic exchange with keys "test.1" and "test.2"
Tried with and without qos/ack.
bind_test_subscribe.rb
require 'bunny'
# start a communication session with the amqp server
server=Bunny.new()
server.start
#server.qos(:prefetch_count => 1)
exchange=server.exchange("test_exchange", :type=>:topic)
@queue = server.queue("test_exchange_queue", :auto_delete=>true)
@queue.bind(exchange,:key=>"test.1")
Thread.new do
@queue.subscribe() do |msg|
puts msg[:payload]
end
end
sleep(10)
#@queue.unbind(exchange,:key=>"test.1")
@queue.bind(exchange,:key=>"test.2")
sleep(10)
# close the client connection
server.stop
When I try to bind, I get the following error:
/usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/bunny/client08.rb:81:in `check_response': Error binding queue: test_exchange_queue to exchange: test_exchange (Bunny::ProtocolError)
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/bunny/queue08.rb:137:in `bind'
from bind_test_subscribe.rb:25:in `<main>
When I try to unbind, the program freezes. Output from ctrl+c:
^C/usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/client.rb:194:in `read': Interrupt
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/client.rb:194:in `send_command'
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/client.rb:124:in `read'
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:272:in `_read'
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:94:in `block in read'
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:89:in `map'
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:89:in `read'
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/transport/frame08.rb:61:in `block in parse'
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/transport/buffer08.rb:263:in `extract'
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/transport/frame08.rb:60:in `parse'
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/bunny/client08.rb:158:in `next_frame'
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/qrack/client.rb:117:in `next_payload'
from /usr/local/rvm/gems/ruby-1.9.3-p0/gems/bunny-0.7.9/lib/bunny/queue08.rb:387:in `unbind'
from bind_test_subscribe.rb:24:in `<main>'
I have a large array consisting of 100 Hash objects, and converted into a json string.
when I tried to publish the json string to rabbitmq with bunny, I got an error msg of
Unknown reply code: 501, text: FRAME_ERROR - type 3, all octets = <<>>: {frame_too_large,130659,130164}
Increase the size of frame_max in both rabbitmq.config and session.rb could solve the problem. But as I increase the size of the array, the error msg could appear again.
Looks like when bunny or rabbitmq splits the msg into frames, it doesn't split it into right sizes.
Any idea?
I am looking to subscribe to receive a single message in a queue (a temporary queue that has an rpc response). Once I receive this message, I'd like to cancel the subscription. Am I right there is no currently supported way to cancel a subscription? e.g.
Timeout.timeout(15) do
callback_queue.subscribe(:block=>true) do |delivery_info, properties, payload|
# Do work
# End blocking wait with unsubscribe
callback_queue.unsubscribe
end
end
I have been observing some failures in the past, but today I see as many as 52 failures:
rspec ./spec/spec_09/bunny_spec.rb:18 # Bunny should connect to an AMQP server
rspec ./spec/spec_09/bunny_spec.rb:22 # Bunny should be able to create and open a new channel
rspec ./spec/spec_09/bunny_spec.rb:31 # Bunny should be able to switch between channels
rspec ./spec/spec_09/bunny_spec.rb:37 # Bunny should raise an error if trying to switch to a non-existent channel
rspec ./spec/spec_09/bunny_spec.rb:41 # Bunny should be able to create an exchange
rspec ./spec/spec_09/bunny_spec.rb:48 # Bunny should be able to create a queue
rspec ./spec/spec_09/bunny_spec.rb:56 # Bunny should raise an error if setting of QoS fails
rspec ./spec/spec_09/bunny_spec.rb:61 # Bunny should be able to set QoS
rspec ./spec/spec_09/exchange_spec.rb:18 # Exchange should raise an error if instantiated as non-existent type
rspec ./spec/spec_09/exchange_spec.rb:23 # Exchange should allow a default direct exchange to be instantiated by specifying :type
rspec ./spec/spec_09/exchange_spec.rb:31 # Exchange should allow a default direct exchange to be instantiated without specifying :type
rspec ./spec/spec_09/exchange_spec.rb:39 # Exchange should allow a default fanout exchange to be instantiated without specifying :type
rspec ./spec/spec_09/exchange_spec.rb:47 # Exchange should allow a default topic exchange to be instantiated without specifying :type
rspec ./spec/spec_09/exchange_spec.rb:55 # Exchange should allow a default headers (amq.match) exchange to be instantiated without specifying :type
rspec ./spec/spec_09/exchange_spec.rb:63 # Exchange should allow a default headers (amq.headers) exchange to be instantiated without specifying :type
rspec ./spec/spec_09/exchange_spec.rb:71 # Exchange should create an exchange as direct by default
rspec ./spec/spec_09/exchange_spec.rb:79 # Exchange should be able to be instantiated as a direct exchange
rspec ./spec/spec_09/exchange_spec.rb:87 # Exchange should be able to be instantiated as a topic exchange
rspec ./spec/spec_09/exchange_spec.rb:95 # Exchange should be able to be instantiated as a fanout exchange
rspec ./spec/spec_09/exchange_spec.rb:103 # Exchange should be able to be instantiated as a headers exchange
rspec ./spec/spec_09/exchange_spec.rb:111 # Exchange should ignore the :nowait option when instantiated
rspec ./spec/spec_09/exchange_spec.rb:115 # Exchange should be able to publish a message
rspec ./spec/spec_09/exchange_spec.rb:120 # Exchange should not modify the passed options hash when publishing a message
rspec ./spec/spec_09/exchange_spec.rb:127 # Exchange should be able to return an undeliverable message
rspec ./spec/spec_09/exchange_spec.rb:135 # Exchange should be able to return a message that exceeds maximum frame size
rspec ./spec/spec_09/exchange_spec.rb:144 # Exchange should report an error if delete fails
rspec ./spec/spec_09/exchange_spec.rb:150 # Exchange should be able to be deleted
rspec ./spec/spec_09/exchange_spec.rb:157 # Exchange should ignore the :nowait option when deleted
rspec ./spec/spec_09/queue_spec.rb:18 # Queue should ignore the :nowait option when instantiated
rspec ./spec/spec_09/queue_spec.rb:22 # Queue should ignore the :nowait option when binding to an exchange
rspec ./spec/spec_09/queue_spec.rb:28 # Queue should raise an error when trying to bind to a non-existent exchange
rspec ./spec/spec_09/queue_spec.rb:34 # Queue should be able to bind to an existing exchange
rspec ./spec/spec_09/queue_spec.rb:40 # Queue should ignore the :nowait option when unbinding from an existing exchange
rspec ./spec/spec_09/queue_spec.rb:46 # Queue should raise an error if unbinding from a non-existent exchange
rspec ./spec/spec_09/queue_spec.rb:52 # Queue should be able to unbind from an exchange
rspec ./spec/spec_09/queue_spec.rb:58 # Queue should be able to publish a message
rspec ./spec/spec_09/queue_spec.rb:64 # Queue should be able to pop a message complete with header and delivery details
rspec ./spec/spec_09/queue_spec.rb:74 # Queue should be able to pop a message and just get the payload
rspec ./spec/spec_09/queue_spec.rb:82 # Queue should be able to pop a message where body length exceeds max frame size
rspec ./spec/spec_09/queue_spec.rb:90 # Queue should be able call a block when popping a message
rspec ./spec/spec_09/queue_spec.rb:97 # Queue should raise an error if purge fails
rspec ./spec/spec_09/queue_spec.rb:104 # Queue should be able to be purged to remove all of its messages
rspec ./spec/spec_09/queue_spec.rb:111 # Queue should return an empty message when popping an empty queue
rspec ./spec/spec_09/queue_spec.rb:119 # Queue should stop subscription without processing messages if max specified is 0
rspec ./spec/spec_09/queue_spec.rb:128 # Queue should stop subscription after processing number of messages specified > 0
rspec ./spec/spec_09/queue_spec.rb:135 # Queue should stop subscription after processing message_max messages < total in queue
rspec ./spec/spec_09/queue_spec.rb:145 # Queue should raise an error when delete fails
rspec ./spec/spec_09/queue_spec.rb:151 # Queue should pass correct block parameters through on subscribe
rspec ./spec/spec_09/queue_spec.rb:165 # Queue should finish processing subscription messages if break is called in block
rspec ./spec/spec_09/queue_spec.rb:184 # Queue should be able to be deleted
rspec ./spec/spec_09/queue_spec.rb:191 # Queue should ignore the :nowait option when deleted
rspec ./spec/spec_09/queue_spec.rb:196 # Queue should support server named queues
after these are resolved, Bunny should use Travis for CI.
I ran into an issue today using bunny via logstash. It appears that if you explicitly set the :mandatory => 0, it actually enables the mandatory flag. I'm was also experimenting with immediate at the time so it could also be a combination of that. I'm going to do some tests to validate.
I'm still combing through the code base to make a pull request.
/cc @michaelklishin
Please change every instance of string.length to string.bytesize so that in Ruby 1.9 messages published with non binary strings like UTF-8 will work.
Or merge in https://github.com/fazibear/bunny/commit/dd0c79da86f3392b510ec8524bf6273ea5c67486
Hi! I love bunny! Thanx for this amazing software!
My problem ist that the workers I wrote have a very high CPU-impact (they "eat" one core).
I have no problem with this on OSX (10.8) but only on Linux x86_64 (Ubuntu 12.04).
Any Suggestions? What is the best way to have a forever running process with bunny-listeners?
Below is a snippet that shows the behaviour.
require "rubygems"require "bunny"
conn = Bunny.new
conn.start
ch = conn.create_channel
q = ch.queue("bunny.examples.hello_world", :auto_delete => true)
x = ch.default_exchange
q.subscribe do |delivery_info, metadata, payload|
puts "Received #{payload}"
end
while true do
# x.publish("Hello!", :routing_key => q.name)
sleep 10
end
1) Bunny should raise an error if the wrong user name or password is used
Failure/Error: lambda { b.start}.should raise_error(Bunny::ProtocolError)
expected Bunny::ProtocolError, got #<Errno::ECONNRESET: Connection reset by peer>
# ./spec/spec_09/connection_spec.rb:11:in `block (2 levels) in <top (required)>'
And the stacktrace, when you do it manually:
Errno::ECONNRESET: Connection reset by peer
from /home/justinf/src/bunny/lib/qrack/client.rb:189:in `read'
from /home/justinf/src/bunny/lib/qrack/client.rb:189:in `send_command'
from /home/justinf/src/bunny/lib/qrack/client.rb:114:in `read'
from /home/justinf/src/bunny/lib/qrack/transport/buffer.rb:280:in `_read'
from /home/justinf/src/bunny/lib/qrack/transport/buffer.rb:92:in `block in read'
from /home/justinf/src/bunny/lib/qrack/transport/buffer.rb:89:in `map'
from /home/justinf/src/bunny/lib/qrack/transport/buffer.rb:89:in `read'
from /home/justinf/src/bunny/lib/qrack/transport/frame.rb:62:in `block in parse'
from /home/justinf/src/bunny/lib/qrack/transport/buffer.rb:263:in `extract'
from /home/justinf/src/bunny/lib/qrack/transport/frame.rb:61:in `parse'
from /home/justinf/src/bunny/lib/bunny/client.rb:128:in `next_frame'
from /home/justinf/src/bunny/lib/bunny/client.rb:162:in `open_connection'
from /home/justinf/src/bunny/lib/bunny/client.rb:301:in `start_session'
from (irb):4
from /home/justinf/.rvm/rubies/ruby-1.9.3-p0/bin/irb:16:in `<main>'
With Bunny on latest master (#7e6b8a), when I run Bunny.new.create_channel
the following exception is raised:
/usr/local/opt/rbenv/versions/1.9.3-p194/lib/ruby/gems/1.9.1/bundler/gems/bunny-7e6b8ab9e118/lib/bunny/session.rb:480:in `next_channel_id': undefined method `next_channel_id' for nil:NilClass (NoMethodError)
from /usr/local/opt/rbenv/versions/1.9.3-p194/lib/ruby/gems/1.9.1/bundler/gems/bunny-7e6b8ab9e118/lib/bunny/channel.rb:156:in `initialize'
from /usr/local/opt/rbenv/versions/1.9.3-p194/lib/ruby/gems/1.9.1/bundler/gems/bunny-7e6b8ab9e118/lib/bunny/session.rb:168:in `new'
from /usr/local/opt/rbenv/versions/1.9.3-p194/lib/ruby/gems/1.9.1/bundler/gems/bunny-7e6b8ab9e118/lib/bunny/session.rb:168:in `create_channel'
from run.rb:4:in `<main>'
Redis uses a pretty cool technique to get what looks like reliable socket timeouts in MRI 1.8:
https://github.com/ezmobius/redis-rb/blob/c41bc94ea6e6eebb718428d74d22c84b6d2513a4/lib/redis/connection/ruby.rb#L44-55
This looks like something bunny could make use of to provide a reliable timeout mechanism for calls.
I found an error when starting some examples (Ruby 1.9.3-p125). I started in one terminal the simple_consumer.rb and in the other the simple_publisher.rb. The simple_consumer.rb returns some (in my opinion) strange debug data:
simple_consumer.rb
I, [2012-04-16 10:01:38#697] INFO -- received: ?
?
capabilitiesF copyrightS$Copyright (C) 2007-2011 VMware, Inc.
informationS5Licensed under the MPL. See http://www.rabbitmq.complatformS
Erlang/OTPproductRabbitMQversionS2.7.1PLAIN AMQPLAINen_US?
I, [2012-04-16 10:01:38#697] INFO -- send: ?
platformSRubyproductSBunny
informationS!http://github.com/ruby-amqp/bunnyversionS0.7.AMQPLAIN#LOGINSguesPASSWORDSguesten_US?
I, [2012-04-16 10:01:38#697] INFO -- received:
?
I, [2012-04-16 10:01:38#697] INFO -- send:
?
I, [2012-04-16 10:01:38#697] INFO -- send:
(/?
I, [2012-04-16 10:01:38#697] INFO -- received:
)?
I, [2012-04-16 10:01:38#697] INFO -- send:
?
I, [2012-04-16 10:01:38#697] INFO -- received:
?
I, [2012-04-16 10:01:38#697] INFO -- send:
/data?
I, [2012-04-16 10:01:38#697] INFO -- received:
?
I, [2012-04-16 10:01:38#697] INFO -- send: 2
po_box?
I, [2012-04-16 10:01:38#697] INFO -- received: 2
po_box?
I, [2012-04-16 10:01:38#697] INFO -- send: (
sorting_roomdirect?
I, [2012-04-16 10:01:38#697] INFO -- received: (
?
I, [2012-04-16 10:01:38#697] INFO -- send: $2po_box
sorting_roomfred?
I, [2012-04-16 10:01:38#697] INFO -- received: 2?
I, [2012-04-16 10:01:38#697] INFO -- send: <po_botesttag1?
testtag1?04-16 10:01:38#697] INFO -- received:
Also the simple_producer.rb returns some strange stuff:
I, [2012-04-16 10:02:32#700] INFO -- received: ?
?
capabilitiesF copyrightS$Copyright (C) 2007-2011 VMware, Inc.
informationS5Licensed under the MPL. See http://www.rabbitmq.complatformS
Erlang/OTPproductRabbitMQversionS2.7.1PLAIN AMQPLAINen_US?
I, [2012-04-16 10:02:32#700] INFO -- send: ?
platformSRubyproductSBunny
informationS!http://github.com/ruby-amqp/bunnyversionS0.7.AMQPLAIN#LOGINSguesPASSWORDSguesten_US?
I, [2012-04-16 10:02:32#700] INFO -- received:
?
I, [2012-04-16 10:02:32#700] INFO -- send:
?
I, [2012-04-16 10:02:32#700] INFO -- send:
(/?
I, [2012-04-16 10:02:32#700] INFO -- received:
)?
I, [2012-04-16 10:02:32#700] INFO -- send:
?
I, [2012-04-16 10:02:32#700] INFO -- received:
?
I, [2012-04-16 10:02:32#700] INFO -- send:
/data?
I, [2012-04-16 10:02:32#700] INFO -- received:
?
I, [2012-04-16 10:02:32#700] INFO -- send: (
sorting_roomdirect?
I, [2012-04-16 10:02:32#700] INFO -- received: (
?
I, [2012-04-16 10:02:32#700] INFO -- send: <(
sorting_roomfred?
I, [2012-04-16 10:02:32#700] INFO -- send: )<,?application/octet-stream?
I, [2012-04-16 10:02:32#700] INFO -- send: ,This is a message from the publisher?
I, [2012-04-16 10:02:32#700] INFO -- send: (?bye?
I, [2012-04-16 10:02:32#700] INFO -- received: )?
I, [2012-04-16 10:02:32#700] INFO -- send:
<?Goodbye?
I, [2012-04-16 10:02:32#700] INFO -- received:
=?
And if the simple_consumer.rb receives a message from the publisher, it fails with
simple_consumer.rb:51:in `block in <main>': undefined method `subscription' for #<Bunny::Queue:0x00000100877520> (NoMethodError)
from /Users/23tux/.rvm/gems/ruby-1.9.3-p125/gems/bunny-0.7.9/lib/qrack/subscription.rb:81:in `call'
from /Users/23tux/.rvm/gems/ruby-1.9.3-p125/gems/bunny-0.7.9/lib/qrack/subscription.rb:81:in `block in start'
from /Users/23tux/.rvm/gems/ruby-1.9.3-p125/gems/bunny-0.7.9/lib/qrack/subscription.rb:56:in `loop'
from /Users/23tux/.rvm/gems/ruby-1.9.3-p125/gems/bunny-0.7.9/lib/qrack/subscription.rb:56:in `start'
from /Users/23tux/.rvm/gems/ruby-1.9.3-p125/gems/bunny-0.7.9/lib/bunny/queue08.rb:311:in `subscribe'
from simple_consumer.rb:50:in `<main>'
Maybe someone can help,
thx
Hello:
I'm packaging this up as an RPM for Fedora (and in the future, EPEL) as a dependency for Chef, and during the packaging process I noticed that the gem has several unneeded files in it, namely the various dot-files.
Changing the gemspec to use
git ls-files *
instead of
git ls-files
(or manually adding some exemptions) should do the trick.
Additionally, the .gemspec is also packaged with the other files in the gem and could probably also be eliminated.
when queue.subscribe called, sometimes it will report error. and quit.
Recently RabbitMQ introduced publisher confirms, which is basically a faster alternative to transactions.
Using publisher confirms requires a lot of client code though, we should come up with something more high level, so it'd be easier for people to use.
Irrelevant note: it's bug 24229 on RabbitMQ bugzilla (not publicly visible).
I've opened the same issue for the AMQP gem.
I suspect this is related to changed socket behavior in 1.9.2, but this needs to be confirmed.
Fails both for master and 0.7.x-stable (but not for 0.7.0, for example).
I wrote a test which fails for me with RabbitMQ:
it "should be able to keep sending messages after an empty message" do
q = @b.queue('test1')
@default_exchange.publish('', :key => 'test1')
@default_exchange.publish('Yet another test message', :key => 'test1')
message_count(q).should == 2
q.purge
end
NoMethodError:
undefined method `message_count' for #Qrack::Protocol::Connection::Close:0x007fef4cae2e80
So it seems the connection is silently closed.
Not sure if this is by design, but maybe an exception should be raised.
We've noticed that in high-message-volume situations with multiple processes connecting to a RabbitMQ node via Bunny, Client#check_response is receiving unexpected values, e.g. plain integers, that cause failures when attempting to create a queue. In moderate volume situation we rarely see this problem, though we have found instance in our logs beginning with our first usage of bunny.
The code is pretty basic, start bunny with connection options, create a queue with no options, write to an exchange passing the created queue as the reply queue. I doubt there's anything special about our implementation as it works perfectly almost all the time, only failing when we run cron jobs that have multiple workers all sending/receiving messages via bunny for many minutes at a time.
It seems odd to me that Client#check_response would receive a value of 2 rather than the expected DeclareOK or some other standard AMQP response. If I'm wrong and our implementation could be to blame please let me know, but I suspect a frame parsing bug in Bunny.
Another detail: once it starts getting bogus values in Client#check_response, it continues to get bogus values, as though the buffer sequencing has gotten shifted causing all frames to be incorrectly parsed.
So, could this be a bug in Bunny? If so, what do you need from us to help reproduce it?
Thanks,
Eric
This bug has been known for some time: http://bunnyamqp.wordpress.com/2009/08/24/bunny-ruby-irb-auto-completion-issue/
Unfortunately, patching IRB on every machine is not an option for us, nor do I think that it is a decent solution.
The issue is that Ruby (irb/completion) expects Module#name to return a string not a symbol.
ObjectSpace.each_object(Module){|m|
begin
name = m.name
rescue Exception
name = ""
end
next if name != "IRB::Context" and
/^(IRB|SLex|RubyLex|RubyToken)/ =~ name
While looping through the modules, we eventually come across a bunch of classes from Bunny / Qrack. Qrack::Protocol09::Connection#name for example returns :connection. I haven't really dug into the details of how this method is used, but I think it is probably a good idea to respect this contract that Module#name returns the actual name of the module, in this case "Qrack::Protocol09::Connection".
Thank you for your consideration.
There are no threads and no calls to send_heartbeat
. Correct me if I'm wrong, but this means that enabling heartbeats will likely cause clients to drop more often than without it, especially if you're in a long running subscribe
block. This seems like really common behavior to me (wanting to subscribe to a queue) and it seems silly to push the responsibility of responding to heartbeats down to consumers of this library.
I have a queue consumer which take msgs and send http request using EM-HTTP , It looks something like that :
require 'bunny'
require 'eventmachine'
require 'em-http'
amqp = Bunny.new(:logging => false, :host => $AMQP_URL)
amqp.start
q = amqp.queue('queue')
exch = amqp.exchange('queue')
q.bind(exch, :key => 'queue')
q.subscribe do |msg|
res = msg[:payload]
http = EventMachine::HttpRequest.new(URL).post :body => {:message => res.to_json }
http.errback { puts http.error }
end
# Close client
amqp.stop
When I send a msg to the queue I get the following error :
/Users/info/.rvm/rubies/ruby-1.9.3-p194/lib/ruby/gems/1.9.1/gems/eventmachine-1.0.0/lib/eventmachine.rb:664:in `connect_server': eventmachine not initialized: evma_connect_to_server (RuntimeError)
from /Users/info/.rvm/rubies/ruby-1.9.3-p194/lib/ruby/gems/1.9.1/gems/eventmachine-1.0.0/lib/eventmachine.rb:664:in `bind_connect'
from /Users/info/.rvm/rubies/ruby-1.9.3-p194/lib/ruby/gems/1.9.1/gems/em-http-request-1.0.3/lib/em-http/http_connection.rb:54:in `activate_connection'
from /Users/info/.rvm/rubies/ruby-1.9.3-p194/lib/ruby/gems/1.9.1/gems/em-http-request-1.0.3/lib/em-http/http_connection.rb:89:in `setup_request'
from /Users/info/.rvm/rubies/ruby-1.9.3-p194/lib/ruby/gems/1.9.1/gems/em-http-request-1.0.3/lib/em-http/http_connection.rb:8:in `post'
Please note that I tried to put the whole block inside EM.run do block but didn't work as well
Any help would be highly appreciated
There seems to be some kind of concurrency issues when I try to write a hell lot of messages locally. I've managed to reproduce the error by just running this code:
require 'bunny'
connection = Bunny.new({host: 'localhost', keepalive: true})
connection.start
channel = connection.create_channel
exchange = channel.topic("general", durable: true)
while true
exchange.publish("hola", routing_key: 'resources', timestamp: Time.now.to_i, persistent: true)
end
after a while you will get this error:
undefined method `read_fully' for nil:NilClass
/Users/viki/.rvm/gems/ruby-1.9.3-p362@oceanus/gems/bunny-0.9.0.pre7/lib/bunny/transport.rb:112:in `read_next_frame'
/Users/viki/.rvm/gems/ruby-1.9.3-p362@oceanus/gems/bunny-0.9.0.pre7/lib/bunny/main_loop.rb:50:in `run_once'
/Users/viki/.rvm/gems/ruby-1.9.3-p362@oceanus/gems/bunny-0.9.0.pre7/lib/bunny/main_loop.rb:31:in `block in run_loop'
/Users/viki/.rvm/gems/ruby-1.9.3-p362@oceanus/gems/bunny-0.9.0.pre7/lib/bunny/main_loop.rb:28:in `loop'
/Users/viki/.rvm/gems/ruby-1.9.3-p362@oceanus/gems/bunny-0.9.0.pre7/lib/bunny/main_loop.rb:28:in `run_loop'
...
which comes continuously given that the Exception is swallowed in the main_loop
I've managed to avoid the error by setting single threaded true:
connection = Bunny.new({host: 'queue.dev.viki.io', keepalive: true, threaded: false})
I'm getting a deprecation message every time I try to call publish on a queue. Looking at the code, I'm supposed to be calling exchange.publish ? But exchange is a private method, so what am I supposed to do to queue items ?
b = Bunny.new
b.start
q = b.queue('test_queue')
q.exchange.publish('something')
~ Qrack::Queue#publish will be removed in Bunny 0.8.
I'm using Bunny 0.7
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.