cloudamqp / amqp-client.cr Goto Github PK
View Code? Open in Web Editor NEWAn AMQP 0-9-1 client for Crystal
Home Page: https://cloudamqp.github.io/amqp-client.cr/
License: MIT License
An AMQP 0-9-1 client for Crystal
Home Page: https://cloudamqp.github.io/amqp-client.cr/
License: MIT License
When creating a new Client from a URL with a trailing slash like amqp://localhost:5672/
the vhost is initialized with an empty string which results in a vhost not found error.
The Problem might be that when checking the vhost it is never checked whether the path is just /
as well as when using URI.decode_www_form
the last character is omitted.
It would be great if there was an example of writing a remote procedure call with this library using Thread::Mutex and Thread::ConditionVariable. I attempted to do this, but I failed. So I used sleep instead, which works but I don't think is really the best way to do it.
My code so far: https://github.com/aabajyan/norminette.cr/blob/main/src/sender.cr
RPC on RabbitMQ: https://www.rabbitmq.com/tutorials/tutorial-six-ruby.html
In amqp-client/src/amqp-client/connection.cr:3:1
3 | require "logger"
^
Error: can't find file 'logger'
If you're trying to require a shard:
- Did you remember to run `shards install`?
- Did you make sure you're running the compiler in the same directory as your shard.yml?
Is there a way to use the following method? https://www.rabbitmq.com/direct-reply-to.html
Looking at #5 , I still can't figure out how to make an Event based loop around consuming incoming messages.
It seems that the whole subscribe
function isn't actually re-looping to receive a new message but just blocks until a message arrives and then ends.
Any suggestions? maybe I'm using the wrong methods?
It seems that the usage of StringPool in some cases might cause an invalid memory access:
[0x0] ???
[0x555a962004fe] -> at /usr/share/crystal/src/fiber.cr:98:34
[0x555a9620053d] run at /usr/share/crystal/src/fiber.cr:146:11
[0x555a96c5584b] -> at /opt/lib/amqp-client/src/amqp-client/connection.cr:19:7
[0x555a96bff1f6] read_loop at /opt/lib/amq-protocol/src/amq/protocol/frames.cr:32:37
[0x555a96c05054] from_io at /opt/lib/amq-protocol/src/amq/protocol/frames.cr:251:31
[0x555a96c0c217] from_io at /opt/lib/amq-protocol/src/amq/protocol/frames.cr:1372:13
[0x555a96c0718f] from_io at /opt/lib/amq-protocol/src/amq/protocol/short_string.cr:21:9
[0x555a96bafccb] get at /usr/share/crystal/src/string_pool.cr:101:5
[0x555a96bb0080] get at /usr/share/crystal/src/string_pool.cr:123:5
[0x555a96283ab3] []= at /usr/share/crystal/src/pointer.cr:132:6
[0x7fe6dfccf420] ?? +140629573956640 in /lib/x86_64-linux-gnu/libpthread.so.0
[0x555a9620d595] -> at /usr/share/crystal/src/signal.cr:152:5
[0x555a9620d6e0] print_backtrace at /usr/share/crystal/src/exception/call_stack/libunwind.cr:100:5
Invalid memory access (signal 11) at address 0x7fe3196fea38
We are trying to test it further and provide a fix, but if you have some idea on how to make it better let me know ๐
amqp-client.cr/src/amqp-client/channel.cr
Line 514 in 21c952b
Can the result of AMQP::Client.new(...).connect
be shared across fibers?
If not, what's the recommended way to access RabbitMQ from multiple fibers?
Thanks!
It seems that there is a bug when trying to open a channel that passes AMQ::Protocol::Frame
. Errors are not created with other types (String
, Int32
, etc) but this simple test shows the error:
test.cr
require "amqp-client"
channel = Channel(AMQ::Protocol::Frame).new
channel.receive
Output:
crystal run test.cr
Showing last frame. Use --error-trace for full trace.
In /usr/local/Cellar/crystal/0.31.0/src/channel.cr:167:20
167 | @receivers << {Fiber.current, pointerof(value), pointerof(state), nil}
^-
Error: no overload matches 'Deque(Tuple(Fiber, Pointer(AMQ::Protocol::Frame), Pointer(Channel::DeliveryState), Channel::SelectContext(AMQ::Protocol::Frame) | Nil))#<<' with type Tuple(Fiber, Pointer(AMQ::Protocol::Frame), Pointer(Channel::DeliveryState), Nil)
Overloads are:
- Deque(T)#<<(value : T)
Crystal Env:
CRYSTAL_CACHE_DIR="/Users/homans/.cache/crystal"
CRYSTAL_PATH="/usr/local/Cellar/crystal/0.31.0/src:lib"
CRYSTAL_VERSION="0.31.0"
CRYSTAL_LIBRARY_PATH="/usr/local/Cellar/crystal/0.31.0/embedded/lib"
amqp-client.cr shard details:
amqp-client:
github: cloudamqp/amqp-client.cr
commit: eaabc1558e746b844c6885269edea844a2e373bf
I think I'm missing something super important (and none of the docs or source code from this project have enlightened me. I've read every line of the source over the past 2 days).
It seems every other client I've found for working with amqp has the ability to invoke a listening state that doesn't just close the connection when the blocks are run.
For instance from the Hello World (Ruby) example from the RabbitMQ documentation clearly shows how to do this:
begin
puts ' [*] Waiting for messages. To exit press CTRL+C'
queue.subscribe(block: true) do |_delivery_info, _properties, body|
puts " [x] Received #{body}"
end
rescue Interrupt => _
connection.close
exit(0)
end
It's also shown in literally every other example in the tutorial, but I figured Ruby would be a closer base to bring up since talking about Crystal.
However subscribing to the queue with this package instantly closes before any messages can be received from anywhere outside of the local block.
Am I crazy for thinking it's weird that I can't seem to pull this off?
Is there something super fundamental I'm missing?
In lib/amq-protocol/src/amq/protocol/table.cr:226:32
226 | private def skip_field : Int32
^
Error: method must return Int32 but it is returning Int64
I didn't see a better place so just wanted to drop a thank you "issue" for all your hard work creating the AMQP client lib for Crystal, we are using them for our production environment as part of the service we give to customers, so thanks and keep up the good work ๐
amqp-client.cr/src/amqp-client/channel.cr
Line 315 in ffcd36c
form -> from
Connection#update_secret
should be implemented, just like in https://github.com/cloudamqp/amqp-client.js/pull/77/files
Hi there, not sure if this is the best place for a "how do I" question. I apologize in advance if you have a forum for that that I missed.
I wanted to check my logic on combining the basic HTTP server in the crystal docs with a basic AMQP server:
require "http/server"
require "amqp-client"
server = HTTP::Server.new do |context|
context.response.content_type = "text/plain"
context.response.print "Hello world!"
end
spawn do
AMQP::Client.start("amqp://guest:guest@localhost") do |connection|
connection.channel do |ch|
q = ch.queue("hello")
puts " [*] Waiting for messages."
q.subscribe(block: true) do |msg|
puts " [x] Received #{msg.body_io.to_s}"
end
end
end
end
address = server.bind_tcp 8080
puts "Listening on http://#{address}"
server.listen
The experience seems right (both servers are responsive), but I'm not sure if I should be using block: true
in a fiber like this, or if it's intended to normally use block: false
plus my own fiber handling like a loop that consumes and yields or something of that nature. Suggestions?
Especially the new Log class (Logger is deprecated) and exhaustive cases.
Do you want a push request with fixes (for the new Crystal logging)?
My code:
my_several_queues_array.each do |q| # iterate some queue info and lets bind & subscribe more queues
AMQP::Client.start "some url" do |c|
c.channel do |ch|
ch.queue_declare...
ch.bind...
q = ch.queue "some queue name"
q.subscribe(no_ack: false block: false) do |msg|
pp msg
ch.basic_ack...
end
end
end
end
[some another code here]
sleep # is this root sleep OK?
Now queues are subscribed, but not receiving new messages from rabbit after start.
When I use block: true
on the last queue subscribe, I am receiving messages from all queues, but I don't want to call block: true
on the last subscribe, because [some another code here]
is not called and it's weird a little.
Is there any solution? Thanks!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.