gftea / amqprs Goto Github PK
View Code? Open in Web Editor NEWAsync & Lock-free RabbitMQ Rust Client, Easy-to-use API
License: MIT License
Async & Lock-free RabbitMQ Rust Client, Easy-to-use API
License: MIT License
Support for the immediate
flag was removed in RabbitMQ 3.0 because it is impossible to implement in ways that do not ruin throughput of multi-node clusters (the node handling basic.publish
had to communicate with peers for every message published).
I suggest that we make the field private and thus remove it from the API.
I feel like this repo would benefit from this. If I made a mistake or didn't follow a best practice and this isn't how this should be achieved in a "production" setting, I'd love to learn.
store server propertities during open connection
provide api to get propertities value
Hi @gftea, I'm fairly new to rust (dev. moving from python to rust), and I was just checking some projects related to rabbitMQ, and rust. I have to say that your project is the best one I've found so far (Really well documented!).
So, I wonder are there any contributing guidelines that one must follow when contributing?
I am trying to figure out how to detect a connection failure and gracefully handle it. Is there a callback method that amqprs provides for this case?
If the server is unreachable when the application starts, it's clear what to do, because we get error results from the Connection::open() function.
However, if we connect to the server and at some point the connection fails, either because the server was rebooted, or our network link had a temporary interruption, it's not apparent how to detect this with amqprs.
I made a test program with a basic application framework. See https://github.com/cdbennett/amqprs-reconnect-example
The core part that sets up the RabbitMQ connection is this:
/// RabbitMQ client task. Returns an error result if the connection is lost.
async fn rabbit_connection_process(cfg: Arc<Config>) -> anyhow::Result<()> {
debug!("starting RabbitMQ task");
let connection = Connection::open(
&OpenConnectionArguments::new(&cfg.host, cfg.port, &cfg.username, &cfg.password)
.virtual_host(&cfg.virtual_host),
)
.await
.with_context(|| {
format!(
"can't connect to RabbitMQ server at {}:{}",
cfg.host, cfg.port
)
})?;
// Add simple connection callback, it just logs diagnostics.
connection
.register_callback(DefaultConnectionCallback)
.await
.context("registering connection callback failed")?;
let channel = connection
.open_channel(None)
.await
.context("opening channel failed")?;
channel
.register_callback(DefaultChannelCallback)
.await
.context("registering channel callback failed")?;
// Declare our receive queue.
let (queue_name, _, _) = channel
.queue_declare(QueueDeclareArguments::default().exclusive(true).finish())
.await
.context("failed to declare queue")?
.expect("when no_wait is false (default) then we should have a value");
debug!("declared queue '{queue_name}'");
let exchange_name = "amq.topic";
debug!("binding exchange {exchange_name} -> queue {queue_name}");
channel
.queue_bind(QueueBindArguments::new(&queue_name, exchange_name, ""))
.await
.context("queue binding failed")?;
let consume_args = BasicConsumeArguments::new(&queue_name, "amqprs_reconnect");
let consumer = MyConsumer::new(consume_args.no_ack);
let consumer_tag = channel
.basic_consume(consumer, consume_args)
.await
.context("failed basic_consume")?;
trace!("consumer tag: {consumer_tag}");
// Wait forever ... TODO: somehow we would like to have a signal
// to terminate this task if the RabbitMQ connection is lost.
loop {
sleep(Duration::from_secs(60)).await;
}
}
The output when the RabbitMQ server connection is lost looks like this :
2023-02-14T00:33:31.528550Z TRACE amqprs::net::split_connection: RECV on channel 1: ConsumeOk(MethodHeader { class_id: 60, method_id: 21 }, ConsumeOk { consumer_tag: ShortStr(16, "amqprs_reconnect") })
2023-02-14T00:33:31.528580Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 251836804403847 }
2023-02-14T00:33:31.528787Z TRACE amqprs_reconnect: consumer tag: amqprs_reconnect
2023-02-14T00:33:31.528854Z TRACE amqprs::api::channel::basic: starts task for async consumer amqprs_reconnect on channel 1 [open] of connection 'AMQPRS000@localhost:7771den [open]'
2023-02-14T00:33:31.528883Z INFO amqprs::api::channel::dispatcher: register consumer amqprs_reconnect
[ server connection lost here ]
2023-02-14T00:33:51.566232Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 251856842625757 }
2023-02-14T00:33:51.566281Z ERROR amqprs::net::reader_handler: socket will be closed due to failure of reading frame, cause: peer shutdown
2023-02-14T00:33:51.566376Z INFO amqprs::net::writer_handler: received shutdown notification for connection 'AMQPRS000@localhost:7771den [open]'
2023-02-14T00:33:51.566410Z DEBUG amqprs::api::channel::dispatcher: dispatcher mpsc channel closed, channel 1 [open] of connection 'AMQPRS000@localhost:7771den [open]'
2023-02-14T00:33:51.566424Z INFO amqprs::api::channel::dispatcher: exit dispatcher of channel 1 [open] of connection 'AMQPRS000@localhost:7771den [open]'
2023-02-14T00:33:51.566472Z DEBUG amqprs::api::channel::basic: exit task of async consumer amqprs_reconnect
2023-02-14T00:33:51.566610Z TRACE mio::poll: deregistering event source from poller
In this case, you can see there is some amqprs library code that is run when the connection is lost, but I don't see any of the callbacks having their methods called. We should see something in the DefaultConnectionCallback or DefaultChannelCallback, I would hope.
Is there some other callback we can use to detect connection loss?
use bits instead of vec to manage channel id
2048 / 8 = 256 bytes only
Hello all, I am trying to create a basic consumer in my application. I placed two break points
The first pause is in the None
match arm. I cannot figure out why consumer_tx is getting dropped. Maybe there is something I am doing wrong? Have any of you seen this before? Another interesting thing I see, is that when I restart the container everything seems to be fine. The consumer_tx
is not dropped and my consumer impl gets messages like i would expect it too.
Should we be testing all possible combinations of feature flags in the regression_test.sh
script (Or at least somewhere)? I don't think at the moment there are any feature attributes that depend on more than one feature, but it seems as if it should be tested before the release at the very least. The downside of all possible combinations is that these tests will take considerably longer to test. At the very least the urispec
feature needs to be added to the regression_test.sh
. I will make a PR to go the route desired.
A lot of cargo clippy
warnings are unused variables and trivial things like that. Should I submit a PR that squashes as many of them as possible within a certain time box (say, a couple of hours)?
Trying to implement mutual tls authentication but there's no way to setup the client to use EXTERNAL as auth mechanism as defined in rabbitmq_auth_mechanism_ssl documentation.
Am I bad in searching inside the documentation or is this feature missing even if TlsAdaptor::with_client_auth
does exists?
tracking remainig work of #64
for MapSerializer, add test for !self.is_len_known #71
for the types.rs, add tests for len of FieldValue variants A F x - #76
improvements: - #77 merged into branch release_v2
Hi,
I've been willing to take on #62 and before I want to be able to run the integration tests.
However I'm facing two main issues:
start_rabbitmq.sh
the permissions do not apply on my Mac OS installation, UID=1001
is not valid, which makes the process throw errors, although not stopping. ERROR ==> Couldn't start RabbitMQ in background.
I've been trying to run it in isolation (simple docker composer) without any different result. The official rabbitmq image works though.Any recommendations?
QueueDeclareArguments
uses the builder pattern and that's nice but it would be even better
to also provide a few constructor/factory methods for common cases:
Clients such as Bunny provide such helpers.
I'd be happy to submit a PR if it'd be accepted.
Hi,
I'm trying to detect unexpected consumer cancellations.
Based on https://www.rabbitmq.com/consumer-cancel.html
ChannelCallback::cancel() seems to be the best for this, but I can't make it work.
I tried doing something like this:
But ChannelCallback::cancel() was not called.
Did I miss some configuration?
I've read here https://www.rabbitmq.com/connections.html#capabilities
that this extension (consumer_cancel_notify) needs to be present in client capabilities, otherwise RabbitMQ does not send any notification.
I modified Connection::open() to check if it would help, and it did.
versions:
Rust: 1.73
amqprs: 1.5.0
Hi, I am building an adapter to your library, but for that I need to fully understand your project. So, I was running your examples, but encountered a problem with the publisher subscriber example.
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: NetworkError("network io error: Connection reset by peer (os error 54)")'
I am running rust 1.68 on macOs. Also, I am using the following command to start my rabbitMQ local server zsh rabbitmq-server
Should I need to specify something else when starting the rabbit server instance?
Thank you in advance, and have a nice day.
It's fairly unusual for users to think of how many entries there are in their map. I think FieldTable
needs one more constructor method.
If a message size exceed the frame-max, the library need to support to split the message into multiple body frames.
Currently, OpenConnectionArguments
needs to specify user, password, host, port separately.
I think it would be good to create such arguments from url, so applications can use less config items.
let arguments = OpenConnectionArguments::from_url("amqp://user:password@localhost:port/virtual_host")
The compliance_assert
feature is listed as compliance_assert
in amqprs/Cargo.toml
, but in the code where the features are toggled with #[cfg(feature = "compilance_assert")]
. See the transposition error in compliance
vs compilance
. Is this intentional? Also, the module is listed as compilance_assert.rs
. This may be correct, but it seems like this would cause a disconnect?
There is a small number of built-in exchange types plus any arbitrary x-*
custom types.
By implementing From
string and string slices, this can probably be a be source-compatible change.
If a consumer panics, the library keeps trying to deliver future messages to it, but the consumer seems to have crashed or hung up, and it never receives the messages.
What is the expected behavior if a consumer panics? Should the consumer be removed entirely? Or should it continue to function normally?
It seems that panics are being caught somehow, since the app does not abort and exit. Maybe the Tokio runtime is catching it? The result is that we are somewhere in between aborting the app, and catching the panic, a place where the app will continue to run but the consumer just stops receiving any messages.
I have experienced web server frameworks like Axum and Actix Web, which catch and recover from panics in user handlers gracefully, so should we aim for similar behavior in amqprs?
In this test, we have a consumer app that subscribes to messages, and intentionally panics when the 4th message is received.
$ RUST_LOG=TRACE cargo run --bin consumer
...
// started
2023-03-07T20:58:34.592109Z TRACE amqprs::net::split_connection: 92 bytes read from network
2023-03-07T20:58:34.592169Z TRACE amqprs::net::split_connection: RECV on channel 1: Deliver(MethodHeader { class_id: 60, method_id: 60 }, Deliver { consumer_tag: ShortStr(16, "amqprs_reconnect"), delivery_tag: 1, redelivered: false, exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(0, "") })
2023-03-07T20:58:34.592198Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851418214405864 }
2023-03-07T20:58:34.592237Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentHeader(ContentHeader { common: ContentHeaderCommon { class: 60, weight: 0, body_size: 13 }, basic_properties: BasicProperties { property_flags: [0, 0], content_type: None, content_encoding: None, headers: None, delivery_mode: None, priority: None, correlation_id: None, reply_to: None, expiration: None, message_id: None, timestamp: None, message_type: None, user_id: None, app_id: None, cluster_id: None } })
2023-03-07T20:58:34.592442Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851418214648260 }
2023-03-07T20:58:34.592466Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentBody(ContentBody { inner: [72, 101, 108, 108, 111, 44, 32, 87, 111, 114, 108, 100, 33] })
2023-03-07T20:58:34.592482Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851418214691050 }
2023-03-07T20:58:34.592561Z INFO consumer: consume delivery Deliver: consumer_tag = amqprs_reconnect, delivery_tag = 1, redelivered = false, exchange = amq.topic, routing_key = on channel 1 [open] of connection 'AMQPRS000@localhost:13300/ [open]', content size: 13
2023-03-07T20:58:34.592579Z INFO consumer: panic countdown: 3
2023-03-07T20:58:34.592587Z INFO consumer: ack to delivery Deliver: consumer_tag = amqprs_reconnect, delivery_tag = 1, redelivered = false, exchange = amq.topic, routing_key = on channel 1 [open] of connection 'AMQPRS000@localhost:13300/ [open]'
2023-03-07T20:58:34.592626Z TRACE amqprs::net::split_connection: SENT on channel 1: Ack(MethodHeader { class_id: 60, method_id: 80 }, Ack { delivery_tag: 1, mutiple: false })
2023-03-07T20:58:34.592675Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:13300/ [open]' heartbeat deadline is updated to Instant { t: 851388214882059 }
// ^^ sent 1 (ok)
...
// ^^ sent 2 (ok)
2023-03-07T20:58:48.353648Z TRACE amqprs::net::split_connection: 8 bytes read from network
2023-03-07T20:58:48.353712Z TRACE amqprs::net::split_connection: RECV on channel 0: HeartBeat(HeartBeat)
2023-03-07T20:58:48.353746Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851431976260929 }
2023-03-07T20:58:48.353777Z DEBUG amqprs::net::reader_handler: received heartbeat on connection 'AMQPRS000@localhost:13300/ [open]'
2023-03-07T20:58:53.282089Z TRACE amqprs::net::split_connection: 92 bytes read from network
2023-03-07T20:58:53.282138Z TRACE amqprs::net::split_connection: RECV on channel 1: Deliver(MethodHeader { class_id: 60, method_id: 60 }, Deliver { consumer_tag: ShortStr(16, "amqprs_reconnect"), delivery_tag: 3, redelivered: false, exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(0, "") })
2023-03-07T20:58:53.282166Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851436904794243 }
2023-03-07T20:58:53.282197Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentHeader(ContentHeader { common: ContentHeaderCommon { class: 60, weight: 0, body_size: 13 }, basic_properties: BasicProperties { property_flags: [0, 0], content_type: None, content_encoding: None, headers: None, delivery_mode: None, priority: None, correlation_id: None, reply_to: None, expiration: None, message_id: None, timestamp: None, message_type: None, user_id: None, app_id: None, cluster_id: None } })
2023-03-07T20:58:53.282273Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851436904900711 }
2023-03-07T20:58:53.282294Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentBody(ContentBody { inner: [72, 101, 108, 108, 111, 44, 32, 87, 111, 114, 108, 100, 33] })
2023-03-07T20:58:53.282309Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851436904938418 }
2023-03-07T20:58:53.282373Z INFO consumer: consume delivery Deliver: consumer_tag = amqprs_reconnect, delivery_tag = 3, redelivered = false, exchange = amq.topic, routing_key = on channel 1 [open] of connection 'AMQPRS000@localhost:13300/ [open]', content size: 13
2023-03-07T20:58:53.282392Z INFO consumer: panic countdown: 1
2023-03-07T20:58:53.282400Z INFO consumer: ack to delivery Deliver: consumer_tag = amqprs_reconnect, delivery_tag = 3, redelivered = false, exchange = amq.topic, routing_key = on channel 1 [open] of connection 'AMQPRS000@localhost:13300/ [open]'
2023-03-07T20:58:53.282436Z TRACE amqprs::net::split_connection: SENT on channel 1: Ack(MethodHeader { class_id: 60, method_id: 80 }, Ack { delivery_tag: 3, mutiple: false })
2023-03-07T20:58:53.282479Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:13300/ [open]' heartbeat deadline is updated to Instant { t: 851406905106624 }
// ^^ sent 3 (ok)
// Next received message will trigger our intentional panic for testing:
2023-03-07T20:59:01.952081Z TRACE amqprs::net::split_connection: 92 bytes read from network
2023-03-07T20:59:01.952133Z TRACE amqprs::net::split_connection: RECV on channel 1: Deliver(MethodHeader { class_id: 60, method_id: 60 }, Deliver { consumer_tag: ShortStr(16, "amqprs_reconnect"), delivery_tag: 4, redelivered: false, exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(0, "") })
2023-03-07T20:59:01.952162Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851445574985087 }
2023-03-07T20:59:01.952193Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentHeader(ContentHeader { common: ContentHeaderCommon { class: 60, weight: 0, body_size: 13 }, basic_properties: BasicProperties { property_flags: [0, 0], content_type: None, content_encoding: None, headers: None, delivery_mode: None, priority: None, correlation_id: None, reply_to: None, expiration: None, message_id: None, timestamp: None, message_type: None, user_id: None, app_id: None, cluster_id: None } })
2023-03-07T20:59:01.952468Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851445575289391 }
2023-03-07T20:59:01.952494Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentBody(ContentBody { inner: [72, 101, 108, 108, 111, 44, 32, 87, 111, 114, 108, 100, 33] })
2023-03-07T20:59:01.952510Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851445575334271 }
2023-03-07T20:59:01.952575Z INFO consumer: consume delivery Deliver: consumer_tag = amqprs_reconnect, delivery_tag = 4, redelivered = false, exchange = amq.topic, routing_key = on channel 1 [open] of connection 'AMQPRS000@localhost:13300/ [open]', content size: 13
2023-03-07T20:59:01.952597Z INFO consumer: panic time!
thread 'tokio-runtime-worker' panicked at 'testing consumer handling of panics', src/bin/consumer.rs:229:17
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
// ^^ panicked
2023-03-07T20:59:20.136793Z TRACE amqprs::net::split_connection: 92 bytes read from network
2023-03-07T20:59:20.136853Z TRACE amqprs::net::split_connection: RECV on channel 1: Deliver(MethodHeader { class_id: 60, method_id: 60 }, Deliver { consumer_tag: ShortStr(16, "amqprs_reconnect"), delivery_tag: 5, redelivered: false, exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(0, "") })
2023-03-07T20:59:20.136892Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851463760119248 }
2023-03-07T20:59:20.136937Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentHeader(ContentHeader { common: ContentHeaderCommon { class: 60, weight: 0, body_size: 13 }, basic_properties: BasicProperties { property_flags: [0, 0], content_type: None, content_encoding: None, headers: None, delivery_mode: None, priority: None, correlation_id: None, reply_to: None, expiration: None, message_id: None, timestamp: None, message_type: None, user_id: None, app_id: None, cluster_id: None } })
2023-03-07T20:59:20.137036Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851463760264706 }
2023-03-07T20:59:20.137071Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentBody(ContentBody { inner: [72, 101, 108, 108, 111, 44, 32, 87, 111, 114, 108, 100, 33] })
2023-03-07T20:59:20.137090Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851463760323153 }
2023-03-07T20:59:20.137159Z ERROR amqprs::api::channel::dispatcher: failed to dispatch message to consumer amqprs_reconnect on channel 1 [open] of connection 'AMQPRS000@localhost:13300/ [open]'
2023-03-07T20:59:23.283800Z TRACE amqprs::net::split_connection: SENT on channel 0: HeartBeat(HeartBeat)
2023-03-07T20:59:23.283980Z DEBUG amqprs::net::writer_handler: sent heartbeat over connection 'AMQPRS000@localhost:13300/ [open]'
// ^^ sent 4 (failed to dispatch)
2023-03-07T20:59:29.469691Z TRACE amqprs::net::split_connection: 92 bytes read from network
2023-03-07T20:59:29.469741Z TRACE amqprs::net::split_connection: RECV on channel 1: Deliver(MethodHeader { class_id: 60, method_id: 60 }, Deliver { consumer_tag: ShortStr(16, "amqprs_reconnect"), delivery_tag: 6, redelivered: false, exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(0, "") })
2023-03-07T20:59:29.469769Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851473093210483 }
2023-03-07T20:59:29.469800Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentHeader(ContentHeader { common: ContentHeaderCommon { class: 60, weight: 0, body_size: 13 }, basic_properties: BasicProperties { property_flags: [0, 0], content_type: None, content_encoding: None, headers: None, delivery_mode: None, priority: None, correlation_id: None, reply_to: None, expiration: None, message_id: None, timestamp: None, message_type: None, user_id: None, app_id: None, cluster_id: None } })
2023-03-07T20:59:29.469870Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851473093311935 }
2023-03-07T20:59:29.469888Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentBody(ContentBody { inner: [72, 101, 108, 108, 111, 44, 32, 87, 111, 114, 108, 100, 33] })
2023-03-07T20:59:29.469903Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851473093346081 }
2023-03-07T20:59:29.469953Z ERROR amqprs::api::channel::dispatcher: failed to dispatch message to consumer amqprs_reconnect on channel 1 [open] of connection 'AMQPRS000@localhost:13300/ [open]'
// ^^ sent 5 (failed to dispatch)
Hello ๐๐ป
I'm trying to create a consumer that will be running until an error happens on the rabbitmq connection (so it should technically never stop).
I'm currently having an issue where the connection is dropped and the way I found around that was to loop on the basic_get
call. Looking at the code this feels inefficient compared to basic_consume
(and also it does say in the amqp reference that it's not efficient).
I've tried going about this issue with the following code:
loop {
tokio::time::sleep(Duration::new(2, 0));
}
But it seems the connection object I have still gets dropped for some reason.
Happy to share more code and/or discuss it but I'd gladly take a working example if you have one ready !
I'm building a wrapper to your library, and recently came across a problem to prevent a channel connection from being dropped.
So, the question is, should each queue have their own channel or is it better to have one channel, and bind multiple queues to it?
Which one is considered bad practice?
For example:
Approach: one
struct MyConsumer {
connection: Connection
channels: Vec<Channel>
}
impl MyConsumer {
fn create_channel(&mut self, queue: String) -> Channel {
/* logic to create, and bind a channel goes here, pushes to vector */
}
}
Approach: two
struct MyConsumer {
connection: Connection
channel: Channel
}
impl MyConsumer {
fn create_channel(&mut self, queue: String) {
/* creates queue and binds it to one channel*/
}
}
My problem with my current approach is that when binding a queue to a channel, I'm unable to prevent the channel from being dropped.
Any advice is much appreciated.
use clippy for msrv linting
A message can carry no payload but it will always have delivery metadata (e.g. delivery tag) and its own metadata (basic properties). So it does not make sense to use Option<T>
for them.
Hi,
I am trying to connect to CloudAMQP by using the TLS connection (amqps).
let mut args = OpenConnectionArguments::new(
"host.cloudamqp.com",
5671,
"user",
"super_secret",
);
args.virtual_host("/my_vh");
let connection = Connection::open(&args)
.await?;
Given that the TLS example involves file certificates I was thinking that this is the way to do it merely as a client (as many other libraries do). Unfortunately I can't connect:
Error: AMQP network error: network io error: Connection reset by peer (os error 54)
I'll appreciate any help.
I am currently trying to implement a connection pool for amqprs using deadpool
, which only provides a &mut Connection for the recycle
method, so I would greatly appreciate if close only requires &self, or provide a way to manually set is_open
.
๐๐ป
I've noticed that when cloning the original instance of a Connection
, it can happen that the original instance that was used to clone (with master == true
) goes out of scope. Thus closing the underlying connection and making the clone unusable.
I don't yet have an idea on how to properly fix this behavior given that there is no AsyncDrop
.
I'm submitting this issue to get the conversation going because this poses a problem in some code I use and the only workaround I have found at the moment is to wrap my connection in an Arc
.
In ChannelDispatcher
, the ConsumerResource
will be created for buffering incoming messages if no consumer is registered.
If a consumer is cancelled by client or server, due to asynchronous nature, there can still messages coming into the dispatcher for that consumer.
start a timer when starting to buffer message, if timer expire, remove the ConsumerResource
basic_consume
and basic_consume_blocking
to return task JoinHanlder
to user, so that user can check task failurebasic_consume_rx
return ConsumerMessage
type which have unnecessary Option
wrapper for fields. We should define a new message type to be used for user, and and keep internal message buffering type hiddencompilance check according to amqp spec.
Current design disable clone of channel in order to protect them from interleaving the AMQP channel messages by using same channel in concurrent setup, but it decrease the flexibility of the library.
Instead, it would be better that user to just be aware of such limitation in AMQP protocol, but the library still allow for clone of channel.
Connection already support clone, and channel can follow the same design.
Right now it is a u8 or something like that. Other clients have long moved to expose a boolean (or an enum).
I think it would be valuable to have some infrastructure in place to allow for performance testing. This would let us see where this library compares to other RabbitMQ clients and will help quantify performance increases (and potential decreases) version to version or while developing. I don't have any specific tools in mind and will begin researching if this idea is given the green light. Do you think this idea is worth adding? This idea was inspired by your change that reduces the number of syscalls (#33) and I am curious how much of a performance increase was achieved by the reduction of syscalls.
at present , consumer is run in async task by default , consumer may be cpu bound, add option to configure to run in a blocking context
Hi, I wanted to present an issue and propose a potential solution that will make the dev experience better. Currently, the local secrets for the RabbitMQ configuration are being tracked by git, and are independent across machines. This makes all contributors rabbitmq_conf
directories different than @gftea 's. Since the certs get generated by the start_rabbitmq.sh
, we can add the rabbitmq_conf
directory to the git ignore and modify the start_rabbitmq.sh
script to create the rabbitmq_conf
directory in the case it doesn't exist (Will happen on a fresh clone). Without this, contributors local repositories will never be up to date with main: see snippet of my git status below. Additionally, I had to change the permission of these files when I first cloned the repository in order to execute the scripts.
Changes not staged for commit:
(use "git add <file>..." to update what will be committed)
(use "git restore <file>..." to discard changes in working directory)
modified: rabbitmq_conf/client/ca_certificate.pem
modified: rabbitmq_conf/client/ca_key.pem
modified: rabbitmq_conf/client/client_AMQPRS_TEST_certificate.pem
modified: rabbitmq_conf/client/client_AMQPRS_TEST_key.p12
modified: rabbitmq_conf/client/client_AMQPRS_TEST_key.pem
modified: rabbitmq_conf/server/ca_certificate.pem
modified: rabbitmq_conf/server/ca_key.pem
modified: rabbitmq_conf/server/server_AMQPRS_TEST_certificate.pem
modified: rabbitmq_conf/server/server_AMQPRS_TEST_key.p12
modified: rabbitmq_conf/server/server_AMQPRS_TEST_key.pem
Untracked files:
(use "git add <file>..." to include in what will be committed)
rabbitmq_conf/client/client_AMQPRS_TEST.p12
rabbitmq_conf/server/server_AMQPRS_TEST.p12
no changes added to commit (use "git add" and/or "git commit -a")
Is there any downside? If not, I would be happy to begin working on this solution.
Hi, I was trying to create a FieldTable
with an integer type, such as:
let mut ft = FieldTable::new();
ft.insert(
"x-message-ttl".try_into().unwrap(),
60000.try_into().unwrap(),
);
and the compiler tells me that "required for {integer} to implement TryInto<amqp_serde::types::FieldValue>".
Also, I've also found that both FieldName
and FieldValue
are private, so that I can not create such a FieldValue
from 60000
.
Would you want me to create a PR to fix this small issue? I'm glad to help.
https://www.baeldung.com/java-rabbitmq-exchanges-queues-bindings
I could be wrong but I would've expected the ability to declare queue name instead of getting it passed back.
let queue_declare_args = amqprs::channel::QueueDeclareArguments::default();
let (queue_name, _, _) = channel.queue_declare(queue_declare_args).await?.unwrap();
following guidelines https://rust-lang.github.io/api-guidelines/documentation.html
Nice work.
As far as I can see this library doesn't implement the ratified AMQP version (1.0) but instead the legacy version (0.9.1) that RabbitMQ uses. I suggest renaming the library not to include a mention of AMQP to avoid confusion.
RabbitMQ is pretty much the only message broker that still uses the legacy version of the protocol which means it can pretty much be considered a proprietary protocol of RabbitMQ.
webpki_roots
if no root ca provided by userseems that it is quite common that user want to use system default root with username/password authentication
we provide a default TlsAdaptor when user use amqps
URI, the TLS config will use system default root and username/passwork auth.
new dependency : https://crates.io/crates/rustls-native-certs
receive heartbeat from server
send heartbeat to server
Hi there, it's been quite fun to work with your library. Currently, I started wondering if it's possible to use prefetch for a queue that uses basic_consume.
I've tried something along the lines:
my_channel.basic_qos(
BasicQosArguments::new(
0,0,false
)
);
my_channel.channel
.basic_consume(callback, args)
.await
.unwrap();
For this do I need to implement the Blocking consumer instead?
Kind regards,
Dave.
Having a hard time tracing down the exact issue for reproducability but it happens 100% of the time on my end:
Brandons-MacBook-Air:j2534-0500-poc brandonros 2023-02-04 18:49:02 $ cargo build && RUST_LOG=trace cargo run --bin diag_test
2023-02-04T23:49:07.742795Z INFO diag_test: calling...
2023-02-04T23:49:07.847053Z TRACE mio::poll: registering event source with poller: token=Token(2147483649), interests=READABLE
2023-02-04T23:49:07.848800Z TRACE mio::poll: registering event source with poller: token=Token(0), interests=READABLE | WRITABLE
2023-02-04T23:49:07.856483Z TRACE amqprs::net::split_connection: 510 bytes read from network
2023-02-04T23:49:07.856902Z TRACE amqprs::net::split_connection: RECV on channel 0: Start(MethodHeader { class_id: 10, method_id: 10 }, Start { version_major: 0, version_minor: 9, server_properties: FieldTable({ShortStr(12, "capabilities"): F(FieldTable({ShortStr(18, "connection.blocked"): t(true), ShortStr(18, "publisher_confirms"): t(true), ShortStr(26, "exchange_exchange_bindings"): t(true), ShortStr(22, "consumer_cancel_notify"): t(true), ShortStr(15, "direct_reply_to"): t(true), ShortStr(10, "basic.nack"): t(true), ShortStr(19, "consumer_priorities"): t(true), ShortStr(28, "authentication_failure_close"): t(true), ShortStr(16, "per_consumer_qos"): t(true)})), ShortStr(12, "cluster_name"): S(LongStr(15, "rabbit@rabbitmq")), ShortStr(9, "copyright"): S(LongStr(55, "Copyright (c) 2007-2023 VMware, Inc. or its affiliates.")), ShortStr(11, "information"): S(LongStr(57, "Licensed under the MPL 2.0. Website: https://rabbitmq.com")), ShortStr(7, "version"): S(LongStr(6, "3.11.8")), ShortStr(8, "platform"): S(LongStr(17, "Erlang/OTP 25.2.2")), ShortStr(7, "product"): S(LongStr(8, "RabbitMQ"))}), mechanisms: LongStr(14, "PLAIN AMQPLAIN"), locales: LongStr(5, "en_US") })
2023-02-04T23:49:07.857103Z TRACE amqprs::net::split_connection: SENT on channel 0: StartOk(MethodHeader { class_id: 10, method_id: 11 }, StartOk { client_properties: FieldTable({ShortStr(8, "platform"): S(LongStr(4, "Rust")), ShortStr(7, "product"): S(LongStr(6, "AMQPRS")), ShortStr(15, "connection_name"): S(LongStr(25, "AMQPRS000@localhost:5672/")), ShortStr(7, "version"): S(LongStr(3, "0.1"))}), machanisms: ShortStr(5, "PLAIN"), response: LongStr(12, "\0guest\0guest"), locale: ShortStr(5, "en_US") })
2023-02-04T23:49:07.858683Z TRACE amqprs::net::split_connection: 20 bytes read from network
2023-02-04T23:49:07.858707Z TRACE amqprs::net::split_connection: RECV on channel 0: Tune(MethodHeader { class_id: 10, method_id: 30 }, Tune { channel_max: 2047, frame_max: 131072, heartbeat: 60 })
2023-02-04T23:49:07.858727Z TRACE amqprs::net::split_connection: SENT on channel 0: TuneOk(MethodHeader { class_id: 10, method_id: 31 }, TuneOk { channel_max: 2047, frame_max: 131072, heartbeat: 60 })
2023-02-04T23:49:07.858769Z TRACE amqprs::net::split_connection: SENT on channel 0: Open(MethodHeader { class_id: 10, method_id: 40 }, Open { virtual_host: ShortStr(1, "/"), capabilities: ShortStr(0, ""), insist: 0 })
2023-02-04T23:49:07.860036Z TRACE amqprs::net::split_connection: 13 bytes read from network
2023-02-04T23:49:07.860061Z TRACE amqprs::net::split_connection: RECV on channel 0: OpenOk(MethodHeader { class_id: 10, method_id: 41 }, OpenOk { know_hosts: ShortStr(0, "") })
2023-02-04T23:49:07.860420Z DEBUG amqprs::net::reader_handler: register channel resource on connection 'AMQPRS000@localhost:5672/ [open]'
2023-02-04T23:49:07.860492Z INFO amqprs::api::connection: open connection AMQPRS000@localhost:5672/
2023-02-04T23:49:07.860530Z DEBUG amqprs::net::reader_handler: callback registered on connection 'AMQPRS000@localhost:5672/ [open]'
2023-02-04T23:49:07.860554Z DEBUG amqprs::net::reader_handler: register channel resource on connection 'AMQPRS000@localhost:5672/ [open]'
2023-02-04T23:49:07.860640Z TRACE amqprs::net::split_connection: SENT on channel 1: OpenChannel(MethodHeader { class_id: 20, method_id: 10 }, OpenChannel { out_of_band: ShortStr(0, "") })
2023-02-04T23:49:07.860688Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 530203, tv_nsec: 153721375 }
2023-02-04T23:49:07.863449Z TRACE amqprs::net::split_connection: 16 bytes read from network
2023-02-04T23:49:07.863492Z TRACE amqprs::net::split_connection: RECV on channel 1: OpenChannelOk(MethodHeader { class_id: 20, method_id: 11 }, OpenChannelOk { channel_id: LongStr(0, "") })
2023-02-04T23:49:07.863525Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 530233, tv_nsec: 156553958 }
2023-02-04T23:49:07.863609Z INFO amqprs::api::connection: open channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2023-02-04T23:49:07.863659Z TRACE amqprs::api::channel::dispatcher: starts up dispatcher task of channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2023-02-04T23:49:07.864928Z DEBUG amqprs::api::channel::dispatcher: callback registered on channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2023-02-04T23:49:07.865035Z TRACE amqprs::net::split_connection: SENT on channel 1: DeclareQueue(MethodHeader { class_id: 50, method_id: 10 }, DeclareQueue { ticket: 0, queue: ShortStr(5, "q.req"), bits: 0, arguments: FieldTable({}) })
2023-02-04T23:49:07.865155Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 530203, tv_nsec: 158190625 }
2023-02-04T23:49:07.866775Z TRACE amqprs::net::split_connection: 26 bytes read from network
2023-02-04T23:49:07.866797Z TRACE amqprs::net::split_connection: RECV on channel 1: DeclareQueueOk(MethodHeader { class_id: 50, method_id: 11 }, DeclareQueueOk { queue: ShortStr(5, "q.req"), message_count: 0, consumer_count: 1 })
2023-02-04T23:49:07.866809Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 530233, tv_nsec: 159846833 }
2023-02-04T23:49:07.866881Z TRACE amqprs::net::split_connection: SENT on channel 1: DeclareQueue(MethodHeader { class_id: 50, method_id: 10 }, DeclareQueue { ticket: 0, queue: ShortStr(7, "q.reply"), bits: 0, arguments: FieldTable({}) })
2023-02-04T23:49:07.866905Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 530203, tv_nsec: 159942500 }
2023-02-04T23:49:07.867657Z TRACE amqprs::net::split_connection: 28 bytes read from network
2023-02-04T23:49:07.867680Z TRACE amqprs::net::split_connection: RECV on channel 1: DeclareQueueOk(MethodHeader { class_id: 50, method_id: 11 }, DeclareQueueOk { queue: ShortStr(7, "q.reply"), message_count: 0, consumer_count: 0 })
2023-02-04T23:49:07.867700Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 530233, tv_nsec: 160737416 }
2023-02-04T23:49:07.867786Z TRACE amqprs::net::split_connection: SENT on channel 1: BindQueue(MethodHeader { class_id: 50, method_id: 20 }, BindQueue { ticket: 0, queue: ShortStr(5, "q.req"), exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(5, "q.req"), nowait: false, arguments: FieldTable({}) })
2023-02-04T23:49:07.867830Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 530203, tv_nsec: 160866750 }
2023-02-04T23:49:07.869036Z TRACE amqprs::net::split_connection: 12 bytes read from network
2023-02-04T23:49:07.869052Z TRACE amqprs::net::split_connection: RECV on channel 1: BindQueueOk(MethodHeader { class_id: 50, method_id: 21 }, BindQueueOk)
2023-02-04T23:49:07.869067Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 530233, tv_nsec: 162104000 }
2023-02-04T23:49:07.869137Z TRACE amqprs::net::split_connection: SENT on channel 1: BindQueue(MethodHeader { class_id: 50, method_id: 20 }, BindQueue { ticket: 0, queue: ShortStr(7, "q.reply"), exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(7, "q.reply"), nowait: false, arguments: FieldTable({}) })
2023-02-04T23:49:07.869176Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 530203, tv_nsec: 162212958 }
2023-02-04T23:49:07.870072Z TRACE amqprs::net::split_connection: 12 bytes read from network
2023-02-04T23:49:07.870088Z TRACE amqprs::net::split_connection: RECV on channel 1: BindQueueOk(MethodHeader { class_id: 50, method_id: 21 }, BindQueueOk)
2023-02-04T23:49:07.870103Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 530233, tv_nsec: 163140125 }
2023-02-04T23:49:07.870175Z TRACE amqprs::net::split_connection: SENT on channel 1: Consume(MethodHeader { class_id: 60, method_id: 20 }, Consume { ticket: 0, queue: ShortStr(7, "q.reply"), consumer_tag: ShortStr(14, "reply_consumer"), bits: 0, arguments: FieldTable({}) })
2023-02-04T23:49:07.870213Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 530203, tv_nsec: 163249583 }
2023-02-04T23:49:07.870937Z TRACE amqprs::net::split_connection: 27 bytes read from network
2023-02-04T23:49:07.870959Z TRACE amqprs::net::split_connection: RECV on channel 1: ConsumeOk(MethodHeader { class_id: 60, method_id: 21 }, ConsumeOk { consumer_tag: ShortStr(14, "reply_consumer") })
2023-02-04T23:49:07.870976Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 530233, tv_nsec: 164013041 }
2023-02-04T23:49:07.871100Z TRACE amqprs::api::channel::basic: starts task for async consumer reply_consumer on channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2023-02-04T23:49:07.871123Z INFO amqprs::api::channel::dispatcher: register consumer reply_consumer
2023-02-04T23:49:07.871150Z TRACE amqprs::net::split_connection: SENT on channel 1: PublishCombo(Publish { ticket: 0, exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(5, "q.req"), bits: 0 }, ContentHeader { common: ContentHeaderCommon { class: 60, weight: 0, body_size: 0 }, basic_properties: BasicProperties { property_flags: [6, 0], content_type: None, content_encoding: None, headers: None, delivery_mode: None, priority: None, correlation_id: Some(ShortStr(36, "f931eeb4-bcb1-4032-98c4-065bed630ef3")), reply_to: Some(ShortStr(7, "q.reply")), expiration: None, message_id: None, timestamp: None, message_type: None, user_id: None, app_id: None, cluster_id: None } }, ContentBody { inner: [] })
2023-02-04T23:49:07.871208Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 530203, tv_nsec: 164244875 }
2023-02-04T23:49:07.873390Z TRACE amqprs::net::split_connection: 89 bytes read from network
2023-02-04T23:49:07.873402Z TRACE amqprs::net::split_connection: RECV on channel 0: Close(MethodHeader { class_id: 10, method_id: 50 }, Close { reply_code: 505, reply_text: ShortStr(70, "UNEXPECTED_FRAME - expected method frame, got non method frame instead"), class_id: 0, method_id: 0 })
2023-02-04T23:49:07.873413Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 530233, tv_nsec: 166451291 }
2023-02-04T23:49:07.873425Z ERROR amqprs::api::callbacks: handle close request for connection 'AMQPRS000@localhost:5672/ [open]', cause: '505: UNEXPECTED_FRAME - expected method frame, got non method frame instead', (class_id = 0, method_id = 0)
2023-02-04T23:49:07.873436Z INFO amqprs::net::reader_handler: server requests to shutdown connection 'AMQPRS000@localhost:5672/ [closed]'
2023-02-04T23:49:07.873449Z TRACE amqprs::net::split_connection: SENT on channel 0: CloseOk(MethodHeader { class_id: 10, method_id: 51 }, CloseOk)
2023-02-04T23:49:07.873466Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [closed]' heartbeat deadline is updated to Instant { tv_sec: 530203, tv_nsec: 166503541 }
2023-02-04T23:49:07.873501Z INFO amqprs::net::reader_handler: connection 'AMQPRS000@localhost:5672/ [closed]' is closed, shutting down socket I/O handlers
2023-02-04T23:49:07.873523Z DEBUG amqprs::api::channel::dispatcher: dispatcher mpsc channel closed, channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [closed]'
2023-02-04T23:49:07.873532Z INFO amqprs::api::channel::dispatcher: exit dispatcher of channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [closed]'
2023-02-04T23:49:07.873552Z INFO amqprs::net::writer_handler: received shutdown notification for connection 'AMQPRS000@localhost:5672/ [closed]'
2023-02-04T23:49:07.873565Z DEBUG amqprs::api::channel::basic: exit task of async consumer reply_consumer
2023-02-04T23:49:07.873607Z TRACE mio::poll: deregistering event source from poller
While no_ack
is indeed a protocol-level field name, it is confusing. For basic.consume
in the API, other clients have generally adopted "auto ack" for the name (or "manual ack", which I like even more).
So I suggest that the field is renamed to auto_ack
and a convenience method (default) is introduced for both modes.
Happy to work on a PR if it would be accepted.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.