Git Product home page Git Product logo

amqprs's People

Contributors

alllel avatar andrieshiemstra avatar dantsz avatar dependabot[bot] avatar gftea avatar jacobmiller22 avatar jdelgadoalfonso avatar julianbraha avatar lmorais-dev avatar mcpatate avatar michaelklishin avatar overworkedcriminal avatar quantum-booty avatar robfalken avatar stephenmal avatar tetofonta avatar tyilo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

amqprs's Issues

BasicPublishArguments: deprecate the immediate field

Support for the immediate flag was removed in RabbitMQ 3.0 because it is impossible to implement in ways that do not ruin throughput of multi-node clusters (the node handling basic.publish had to communicate with peers for every message published).

I suggest that we make the field private and thus remove it from the API.

Contributing guidelines

Hi @gftea, I'm fairly new to rust (dev. moving from python to rust), and I was just checking some projects related to rabbitMQ, and rust. I have to say that your project is the best one I've found so far (Really well documented!).

So, I wonder are there any contributing guidelines that one must follow when contributing?

Support handling connection loss and reconnecting

I am trying to figure out how to detect a connection failure and gracefully handle it. Is there a callback method that amqprs provides for this case?

If the server is unreachable when the application starts, it's clear what to do, because we get error results from the Connection::open() function.

However, if we connect to the server and at some point the connection fails, either because the server was rebooted, or our network link had a temporary interruption, it's not apparent how to detect this with amqprs.

Robust connection reconnecting demo

I made a test program with a basic application framework. See https://github.com/cdbennett/amqprs-reconnect-example

The core part that sets up the RabbitMQ connection is this:

/// RabbitMQ client task. Returns an error result if the connection is lost.
async fn rabbit_connection_process(cfg: Arc<Config>) -> anyhow::Result<()> {
    debug!("starting RabbitMQ task");

    let connection = Connection::open(
        &OpenConnectionArguments::new(&cfg.host, cfg.port, &cfg.username, &cfg.password)
            .virtual_host(&cfg.virtual_host),
    )
    .await
    .with_context(|| {
        format!(
            "can't connect to RabbitMQ server at {}:{}",
            cfg.host, cfg.port
        )
    })?;

    // Add simple connection callback, it just logs diagnostics.
    connection
        .register_callback(DefaultConnectionCallback)
        .await
        .context("registering connection callback failed")?;

    let channel = connection
        .open_channel(None)
        .await
        .context("opening channel failed")?;
    channel
        .register_callback(DefaultChannelCallback)
        .await
        .context("registering channel callback failed")?;

    // Declare our receive queue.
    let (queue_name, _, _) = channel
        .queue_declare(QueueDeclareArguments::default().exclusive(true).finish())
        .await
        .context("failed to declare queue")?
        .expect("when no_wait is false (default) then we should have a value");
    debug!("declared queue '{queue_name}'");

    let exchange_name = "amq.topic";
    debug!("binding exchange {exchange_name} -> queue {queue_name}");
    channel
        .queue_bind(QueueBindArguments::new(&queue_name, exchange_name, ""))
        .await
        .context("queue binding failed")?;

    let consume_args = BasicConsumeArguments::new(&queue_name, "amqprs_reconnect");
    let consumer = MyConsumer::new(consume_args.no_ack);
    let consumer_tag = channel
        .basic_consume(consumer, consume_args)
        .await
        .context("failed basic_consume")?;
    trace!("consumer tag: {consumer_tag}");

    // Wait forever ... TODO: somehow we would like to have a signal
    // to terminate this task if the RabbitMQ connection is lost.
    loop {
        sleep(Duration::from_secs(60)).await;
    }
}

The output when the RabbitMQ server connection is lost looks like this :

2023-02-14T00:33:31.528550Z TRACE amqprs::net::split_connection: RECV on channel 1: ConsumeOk(MethodHeader { class_id: 60, method_id: 21 }, ConsumeOk { consumer_tag: ShortStr(16, "amqprs_reconnect") })
2023-02-14T00:33:31.528580Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 251836804403847 }
2023-02-14T00:33:31.528787Z TRACE amqprs_reconnect: consumer tag: amqprs_reconnect
2023-02-14T00:33:31.528854Z TRACE amqprs::api::channel::basic: starts task for async consumer amqprs_reconnect on channel 1 [open] of connection 'AMQPRS000@localhost:7771den [open]'
2023-02-14T00:33:31.528883Z  INFO amqprs::api::channel::dispatcher: register consumer amqprs_reconnect

[ server connection lost here ]


2023-02-14T00:33:51.566232Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 251856842625757 }
2023-02-14T00:33:51.566281Z ERROR amqprs::net::reader_handler: socket will be closed due to failure of reading frame, cause: peer shutdown
2023-02-14T00:33:51.566376Z  INFO amqprs::net::writer_handler: received shutdown notification for connection 'AMQPRS000@localhost:7771den [open]'
2023-02-14T00:33:51.566410Z DEBUG amqprs::api::channel::dispatcher: dispatcher mpsc channel closed, channel 1 [open] of connection 'AMQPRS000@localhost:7771den [open]'
2023-02-14T00:33:51.566424Z  INFO amqprs::api::channel::dispatcher: exit dispatcher of channel 1 [open] of connection 'AMQPRS000@localhost:7771den [open]'
2023-02-14T00:33:51.566472Z DEBUG amqprs::api::channel::basic: exit task of async consumer amqprs_reconnect
2023-02-14T00:33:51.566610Z TRACE mio::poll: deregistering event source from poller    

In this case, you can see there is some amqprs library code that is run when the connection is lost, but I don't see any of the callbacks having their methods called. We should see something in the DefaultConnectionCallback or DefaultChannelCallback, I would hope.

Is there some other callback we can use to detect connection loss?

Consumer immediately gets closed

Hello all, I am trying to create a basic consumer in my application. I placed two break points

The first pause is in the None match arm. I cannot figure out why consumer_tx is getting dropped. Maybe there is something I am doing wrong? Have any of you seen this before? Another interesting thing I see, is that when I restart the container everything seems to be fine. The consumer_tx is not dropped and my consumer impl gets messages like i would expect it too.

Test combination of features in regression_test.sh

Should we be testing all possible combinations of feature flags in the regression_test.sh script (Or at least somewhere)? I don't think at the moment there are any feature attributes that depend on more than one feature, but it seems as if it should be tested before the release at the very least. The downside of all possible combinations is that these tests will take considerably longer to test. At the very least the urispec feature needs to be added to the regression_test.sh. I will make a PR to go the route desired.

Address (most) clippy warnings

A lot of cargo clippy warnings are unused variables and trivial things like that. Should I submit a PR that squashes as many of them as possible within a certain time box (say, a couple of hours)?

Feature request: Support for external authentication mechanism

Trying to implement mutual tls authentication but there's no way to setup the client to use EXTERNAL as auth mechanism as defined in rabbitmq_auth_mechanism_ssl documentation.

Am I bad in searching inside the documentation or is this feature missing even if TlsAdaptor::with_client_auth does exists?

improvement: amqp_serde design and APIs

tracking remainig work of #64

  • for MapSerializer, add test for !self.is_len_known #71

  • for the types.rs, add tests for len of FieldValue variants A F x - #76

  • improvements: - #77 merged into branch release_v2

    • Create a new PR to:
      1. Remove insert/remove functions.
      2. Make impl TryFrom.
      3. Remove length attribute from ByteArray and FieldArray, FieldTable and impl len function instead.
      4. Modify/Add tests.

Not able to run tests on Mac M1

Hi,

I've been willing to take on #62 and before I want to be able to run the integration tests.

However I'm facing two main issues:

  1. On start_rabbitmq.sh the permissions do not apply on my Mac OS installation, UID=1001 is not valid, which makes the process throw errors, although not stopping.
  2. The bitnami rabbitmq image does not boot. I just get a ERROR ==> Couldn't start RabbitMQ in background. I've been trying to run it in isolation (simple docker composer) without any different result. The official rabbitmq image works though.

Any recommendations?

"Alternative default: queue declaration arguments

QueueDeclareArguments uses the builder pattern and that's nice but it would be even better
to also provide a few constructor/factory methods for common cases:

  • Durable (and thus non-exclusive, non-autodelete) queue
  • Transient exclusive queue
  • Transient auto-delete queue

Clients such as Bunny provide such helpers.

I'd be happy to submit a PR if it'd be accepted.

ChannelCallback::cancel() not called when server delete queue

Hi,

I'm trying to detect unexpected consumer cancellations.
Based on https://www.rabbitmq.com/consumer-cancel.html
ChannelCallback::cancel() seems to be the best for this, but I can't make it work.

I tried doing something like this:

  1. open connection
  2. register connection callback
  3. open channel
  4. register channel callback
  5. declare queue
  6. channel.basic_consume(...)
  7. remove queue using rabbitmqctl

But ChannelCallback::cancel() was not called.

Did I miss some configuration?

Probable cause

I've read here https://www.rabbitmq.com/connections.html#capabilities
that this extension (consumer_cancel_notify) needs to be present in client capabilities, otherwise RabbitMQ does not send any notification.

I modified Connection::open() to check if it would help, and it did.
amqprs missing client capabilities

versions:
Rust: 1.73
amqprs: 1.5.0

Error running the publisher subsriber example.

Hi, I am building an adapter to your library, but for that I need to fully understand your project. So, I was running your examples, but encountered a problem with the publisher subscriber example.

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: NetworkError("network io error: Connection reset by peer (os error 54)")'

I am running rust 1.68 on macOs. Also, I am using the following command to start my rabbitMQ local server zsh rabbitmq-server

Should I need to specify something else when starting the rabbit server instance?

Thank you in advance, and have a nice day.

Support `OpenConnectionArguments::from_url`?

Currently, OpenConnectionArguments needs to specify user, password, host, port separately.

I think it would be good to create such arguments from url, so applications can use less config items.

let arguments = OpenConnectionArguments::from_url("amqp://user:password@localhost:port/virtual_host")

Typo in Compliance Assert feature

The compliance_assert feature is listed as compliance_assert in amqprs/Cargo.toml, but in the code where the features are toggled with #[cfg(feature = "compilance_assert")]. See the transposition error in compliance vs compilance. Is this intentional? Also, the module is listed as compilance_assert.rs. This may be correct, but it seems like this would cause a disconnect?

Consumers crash if they panic

If a consumer panics, the library keeps trying to deliver future messages to it, but the consumer seems to have crashed or hung up, and it never receives the messages.

Expected panic handling?

What is the expected behavior if a consumer panics? Should the consumer be removed entirely? Or should it continue to function normally?

It seems that panics are being caught somehow, since the app does not abort and exit. Maybe the Tokio runtime is catching it? The result is that we are somewhere in between aborting the app, and catching the panic, a place where the app will continue to run but the consumer just stops receiving any messages.

I have experienced web server frameworks like Axum and Actix Web, which catch and recover from panics in user handlers gracefully, so should we aim for similar behavior in amqprs?

Example log

In this test, we have a consumer app that subscribes to messages, and intentionally panics when the 4th message is received.

$ RUST_LOG=TRACE cargo run --bin consumer
...
// started

2023-03-07T20:58:34.592109Z TRACE amqprs::net::split_connection: 92 bytes read from network
2023-03-07T20:58:34.592169Z TRACE amqprs::net::split_connection: RECV on channel 1: Deliver(MethodHeader { class_id: 60, method_id: 60 }, Deliver { consumer_tag: ShortStr(16, "amqprs_reconnect"), delivery_tag: 1, redelivered: false, exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(0, "") })
2023-03-07T20:58:34.592198Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851418214405864 }
2023-03-07T20:58:34.592237Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentHeader(ContentHeader { common: ContentHeaderCommon { class: 60, weight: 0, body_size: 13 }, basic_properties: BasicProperties { property_flags: [0, 0], content_type: None, content_encoding: None, headers: None, delivery_mode: None, priority: None, correlation_id: None, reply_to: None, expiration: None, message_id: None, timestamp: None, message_type: None, user_id: None, app_id: None, cluster_id: None } })
2023-03-07T20:58:34.592442Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851418214648260 }
2023-03-07T20:58:34.592466Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentBody(ContentBody { inner: [72, 101, 108, 108, 111, 44, 32, 87, 111, 114, 108, 100, 33] })
2023-03-07T20:58:34.592482Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851418214691050 }
2023-03-07T20:58:34.592561Z  INFO consumer: consume delivery Deliver: consumer_tag = amqprs_reconnect, delivery_tag = 1, redelivered = false, exchange = amq.topic, routing_key =  on channel 1 [open] of connection 'AMQPRS000@localhost:13300/ [open]', content size: 13
2023-03-07T20:58:34.592579Z  INFO consumer: panic countdown: 3
2023-03-07T20:58:34.592587Z  INFO consumer: ack to delivery Deliver: consumer_tag = amqprs_reconnect, delivery_tag = 1, redelivered = false, exchange = amq.topic, routing_key =  on channel 1 [open] of connection 'AMQPRS000@localhost:13300/ [open]'
2023-03-07T20:58:34.592626Z TRACE amqprs::net::split_connection: SENT on channel 1: Ack(MethodHeader { class_id: 60, method_id: 80 }, Ack { delivery_tag: 1, mutiple: false })
2023-03-07T20:58:34.592675Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:13300/ [open]' heartbeat deadline is updated to Instant { t: 851388214882059 }

// ^^ sent 1 (ok)

...

// ^^ sent 2 (ok)

2023-03-07T20:58:48.353648Z TRACE amqprs::net::split_connection: 8 bytes read from network
2023-03-07T20:58:48.353712Z TRACE amqprs::net::split_connection: RECV on channel 0: HeartBeat(HeartBeat)
2023-03-07T20:58:48.353746Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851431976260929 }
2023-03-07T20:58:48.353777Z DEBUG amqprs::net::reader_handler: received heartbeat on connection 'AMQPRS000@localhost:13300/ [open]'


2023-03-07T20:58:53.282089Z TRACE amqprs::net::split_connection: 92 bytes read from network
2023-03-07T20:58:53.282138Z TRACE amqprs::net::split_connection: RECV on channel 1: Deliver(MethodHeader { class_id: 60, method_id: 60 }, Deliver { consumer_tag: ShortStr(16, "amqprs_reconnect"), delivery_tag: 3, redelivered: false, exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(0, "") })
2023-03-07T20:58:53.282166Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851436904794243 }
2023-03-07T20:58:53.282197Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentHeader(ContentHeader { common: ContentHeaderCommon { class: 60, weight: 0, body_size: 13 }, basic_properties: BasicProperties { property_flags: [0, 0], content_type: None, content_encoding: None, headers: None, delivery_mode: None, priority: None, correlation_id: None, reply_to: None, expiration: None, message_id: None, timestamp: None, message_type: None, user_id: None, app_id: None, cluster_id: None } })
2023-03-07T20:58:53.282273Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851436904900711 }
2023-03-07T20:58:53.282294Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentBody(ContentBody { inner: [72, 101, 108, 108, 111, 44, 32, 87, 111, 114, 108, 100, 33] })
2023-03-07T20:58:53.282309Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851436904938418 }
2023-03-07T20:58:53.282373Z  INFO consumer: consume delivery Deliver: consumer_tag = amqprs_reconnect, delivery_tag = 3, redelivered = false, exchange = amq.topic, routing_key =  on channel 1 [open] of connection 'AMQPRS000@localhost:13300/ [open]', content size: 13
2023-03-07T20:58:53.282392Z  INFO consumer: panic countdown: 1
2023-03-07T20:58:53.282400Z  INFO consumer: ack to delivery Deliver: consumer_tag = amqprs_reconnect, delivery_tag = 3, redelivered = false, exchange = amq.topic, routing_key =  on channel 1 [open] of connection 'AMQPRS000@localhost:13300/ [open]'
2023-03-07T20:58:53.282436Z TRACE amqprs::net::split_connection: SENT on channel 1: Ack(MethodHeader { class_id: 60, method_id: 80 }, Ack { delivery_tag: 3, mutiple: false })
2023-03-07T20:58:53.282479Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:13300/ [open]' heartbeat deadline is updated to Instant { t: 851406905106624 }

// ^^ sent 3 (ok)
// Next received message will trigger our intentional panic for testing:

2023-03-07T20:59:01.952081Z TRACE amqprs::net::split_connection: 92 bytes read from network
2023-03-07T20:59:01.952133Z TRACE amqprs::net::split_connection: RECV on channel 1: Deliver(MethodHeader { class_id: 60, method_id: 60 }, Deliver { consumer_tag: ShortStr(16, "amqprs_reconnect"), delivery_tag: 4, redelivered: false, exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(0, "") })
2023-03-07T20:59:01.952162Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851445574985087 }
2023-03-07T20:59:01.952193Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentHeader(ContentHeader { common: ContentHeaderCommon { class: 60, weight: 0, body_size: 13 }, basic_properties: BasicProperties { property_flags: [0, 0], content_type: None, content_encoding: None, headers: None, delivery_mode: None, priority: None, correlation_id: None, reply_to: None, expiration: None, message_id: None, timestamp: None, message_type: None, user_id: None, app_id: None, cluster_id: None } })
2023-03-07T20:59:01.952468Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851445575289391 }
2023-03-07T20:59:01.952494Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentBody(ContentBody { inner: [72, 101, 108, 108, 111, 44, 32, 87, 111, 114, 108, 100, 33] })
2023-03-07T20:59:01.952510Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851445575334271 }
2023-03-07T20:59:01.952575Z  INFO consumer: consume delivery Deliver: consumer_tag = amqprs_reconnect, delivery_tag = 4, redelivered = false, exchange = amq.topic, routing_key =  on channel 1 [open] of connection 'AMQPRS000@localhost:13300/ [open]', content size: 13

2023-03-07T20:59:01.952597Z  INFO consumer: panic time!
thread 'tokio-runtime-worker' panicked at 'testing consumer handling of panics', src/bin/consumer.rs:229:17
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

// ^^ panicked


2023-03-07T20:59:20.136793Z TRACE amqprs::net::split_connection: 92 bytes read from network
2023-03-07T20:59:20.136853Z TRACE amqprs::net::split_connection: RECV on channel 1: Deliver(MethodHeader { class_id: 60, method_id: 60 }, Deliver { consumer_tag: ShortStr(16, "amqprs_reconnect"), delivery_tag: 5, redelivered: false, exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(0, "") })
2023-03-07T20:59:20.136892Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851463760119248 }
2023-03-07T20:59:20.136937Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentHeader(ContentHeader { common: ContentHeaderCommon { class: 60, weight: 0, body_size: 13 }, basic_properties: BasicProperties { property_flags: [0, 0], content_type: None, content_encoding: None, headers: None, delivery_mode: None, priority: None, correlation_id: None, reply_to: None, expiration: None, message_id: None, timestamp: None, message_type: None, user_id: None, app_id: None, cluster_id: None } })
2023-03-07T20:59:20.137036Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851463760264706 }
2023-03-07T20:59:20.137071Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentBody(ContentBody { inner: [72, 101, 108, 108, 111, 44, 32, 87, 111, 114, 108, 100, 33] })
2023-03-07T20:59:20.137090Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851463760323153 }
2023-03-07T20:59:20.137159Z ERROR amqprs::api::channel::dispatcher: failed to dispatch message to consumer amqprs_reconnect on channel 1 [open] of connection 'AMQPRS000@localhost:13300/ [open]'
2023-03-07T20:59:23.283800Z TRACE amqprs::net::split_connection: SENT on channel 0: HeartBeat(HeartBeat)
2023-03-07T20:59:23.283980Z DEBUG amqprs::net::writer_handler: sent heartbeat over connection 'AMQPRS000@localhost:13300/ [open]'

// ^^ sent 4 (failed to dispatch)

2023-03-07T20:59:29.469691Z TRACE amqprs::net::split_connection: 92 bytes read from network
2023-03-07T20:59:29.469741Z TRACE amqprs::net::split_connection: RECV on channel 1: Deliver(MethodHeader { class_id: 60, method_id: 60 }, Deliver { consumer_tag: ShortStr(16, "amqprs_reconnect"), delivery_tag: 6, redelivered: false, exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(0, "") })
2023-03-07T20:59:29.469769Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851473093210483 }
2023-03-07T20:59:29.469800Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentHeader(ContentHeader { common: ContentHeaderCommon { class: 60, weight: 0, body_size: 13 }, basic_properties: BasicProperties { property_flags: [0, 0], content_type: None, content_encoding: None, headers: None, delivery_mode: None, priority: None, correlation_id: None, reply_to: None, expiration: None, message_id: None, timestamp: None, message_type: None, user_id: None, app_id: None, cluster_id: None } })
2023-03-07T20:59:29.469870Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851473093311935 }
2023-03-07T20:59:29.469888Z TRACE amqprs::net::split_connection: RECV on channel 1: ContentBody(ContentBody { inner: [72, 101, 108, 108, 111, 44, 32, 87, 111, 114, 108, 100, 33] })
2023-03-07T20:59:29.469903Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { t: 851473093346081 }
2023-03-07T20:59:29.469953Z ERROR amqprs::api::channel::dispatcher: failed to dispatch message to consumer amqprs_reconnect on channel 1 [open] of connection 'AMQPRS000@localhost:13300/ [open]'


// ^^ sent 5 (failed to dispatch)

feature request: long-lived Consumer example

Hello ๐Ÿ‘‹๐Ÿป

I'm trying to create a consumer that will be running until an error happens on the rabbitmq connection (so it should technically never stop).

I'm currently having an issue where the connection is dropped and the way I found around that was to loop on the basic_get call. Looking at the code this feels inefficient compared to basic_consume (and also it does say in the amqp reference that it's not efficient).

I've tried going about this issue with the following code:

loop {
    tokio::time::sleep(Duration::new(2, 0));
}

But it seems the connection object I have still gets dropped for some reason.

Happy to share more code and/or discuss it but I'd gladly take a working example if you have one ready !

Best practices for opening channel, and prevent it from being dropped.

I'm building a wrapper to your library, and recently came across a problem to prevent a channel connection from being dropped.
So, the question is, should each queue have their own channel or is it better to have one channel, and bind multiple queues to it?

Which one is considered bad practice?

For example:

Approach: one

struct MyConsumer {
        connection: Connection
        channels: Vec<Channel>   
}


impl MyConsumer {

    fn create_channel(&mut self, queue: String) -> Channel {
           /* logic to create, and bind a channel goes here, pushes to vector */

    }

}

Approach: two

struct MyConsumer {
        connection: Connection
        channel: Channel  
}

impl MyConsumer {

    fn create_channel(&mut self, queue: String) {
           /* creates queue and binds it to one channel*/

    }

}

My problem with my current approach is that when binding a queue to a channel, I'm unable to prevent the channel from being dropped.

Any advice is much appreciated.

TLS connection error: connection reset by peer

Hi,

I am trying to connect to CloudAMQP by using the TLS connection (amqps).

let mut args = OpenConnectionArguments::new(
        "host.cloudamqp.com",
        5671,
        "user",
        "super_secret",
    );
    args.virtual_host("/my_vh");

let connection = Connection::open(&args)
    .await?;

Given that the TLS example involves file certificates I was thinking that this is the way to do it merely as a client (as many other libraries do). Unfortunately I can't connect:

Error: AMQP network error: network io error: Connection reset by peer (os error 54)

I'll appreciate any help.

Only require &self for Connection::close

I am currently trying to implement a connection pool for amqprs using deadpool, which only provides a &mut Connection for the recycle method, so I would greatly appreciate if close only requires &self, or provide a way to manually set is_open.

`impl Drop` behavior for `Channel` & `Connection`

๐Ÿ‘‹๐Ÿป

I've noticed that when cloning the original instance of a Connection, it can happen that the original instance that was used to clone (with master == true) goes out of scope. Thus closing the underlying connection and making the clone unusable.

I don't yet have an idea on how to properly fix this behavior given that there is no AsyncDrop.

I'm submitting this issue to get the conversation going because this poses a problem in some code I use and the only workaround I have found at the moment is to wrap my connection in an Arc.

after cancel consumer, if still have messages bufferred, clean it up after a timer

problem description

In ChannelDispatcher, the ConsumerResource will be created for buffering incoming messages if no consumer is registered.
If a consumer is cancelled by client or server, due to asynchronous nature, there can still messages coming into the dispatcher for that consumer.

proposed solution

start a timer when starting to buffer message, if timer expire, remove the ConsumerResource

feat: consumer APIs improvements

API breaking changes

  1. basic_consume and basic_consume_blocking to return task JoinHanlder to user, so that user can check task failure
  2. current basic_consume_rx return ConsumerMessage type which have unnecessary Option wrapper for fields. We should define a new message type to be used for user, and and keep internal message buffering type hidden

make channel clonable as connection

Current design disable clone of channel in order to protect them from interleaving the AMQP channel messages by using same channel in concurrent setup, but it decrease the flexibility of the library.
Instead, it would be better that user to just be aware of such limitation in AMQP protocol, but the library still allow for clone of channel.

Connection already support clone, and channel can follow the same design.

Performance test tools in repo.

I think it would be valuable to have some infrastructure in place to allow for performance testing. This would let us see where this library compares to other RabbitMQ clients and will help quantify performance increases (and potential decreases) version to version or while developing. I don't have any specific tools in mind and will begin researching if this idea is given the green light. Do you think this idea is worth adding? This idea was inspired by your change that reduces the number of syscalls (#33) and I am curious how much of a performance increase was achieved by the reduction of syscalls.

support blocking consumer

at present , consumer is run in async task by default , consumer may be cpu bound, add option to configure to run in a blocking context

Ignore Local RabbitMQ Secrets

Hi, I wanted to present an issue and propose a potential solution that will make the dev experience better. Currently, the local secrets for the RabbitMQ configuration are being tracked by git, and are independent across machines. This makes all contributors rabbitmq_conf directories different than @gftea 's. Since the certs get generated by the start_rabbitmq.sh, we can add the rabbitmq_conf directory to the git ignore and modify the start_rabbitmq.sh script to create the rabbitmq_conf directory in the case it doesn't exist (Will happen on a fresh clone). Without this, contributors local repositories will never be up to date with main: see snippet of my git status below. Additionally, I had to change the permission of these files when I first cloned the repository in order to execute the scripts.

Changes not staged for commit:
  (use "git add <file>..." to update what will be committed)
  (use "git restore <file>..." to discard changes in working directory)
        modified:   rabbitmq_conf/client/ca_certificate.pem
        modified:   rabbitmq_conf/client/ca_key.pem
        modified:   rabbitmq_conf/client/client_AMQPRS_TEST_certificate.pem
        modified:   rabbitmq_conf/client/client_AMQPRS_TEST_key.p12
        modified:   rabbitmq_conf/client/client_AMQPRS_TEST_key.pem
        modified:   rabbitmq_conf/server/ca_certificate.pem
        modified:   rabbitmq_conf/server/ca_key.pem
        modified:   rabbitmq_conf/server/server_AMQPRS_TEST_certificate.pem
        modified:   rabbitmq_conf/server/server_AMQPRS_TEST_key.p12
        modified:   rabbitmq_conf/server/server_AMQPRS_TEST_key.pem

Untracked files:
  (use "git add <file>..." to include in what will be committed)
        rabbitmq_conf/client/client_AMQPRS_TEST.p12
        rabbitmq_conf/server/server_AMQPRS_TEST.p12

no changes added to commit (use "git add" and/or "git commit -a")

Is there any downside? If not, I would be happy to begin working on this solution.

FieldTable does not accept integer type

Hi, I was trying to create a FieldTable with an integer type, such as:

  let mut ft = FieldTable::new();
  ft.insert(
      "x-message-ttl".try_into().unwrap(),
      60000.try_into().unwrap(),
  );

and the compiler tells me that "required for {integer} to implement TryInto<amqp_serde::types::FieldValue>".
Also, I've also found that both FieldName and FieldValue are private, so that I can not create such a FieldValue from 60000.
Would you want me to create a PR to fix this small issue? I'm glad to help.

Suggestion: Do not use "amqp" in the library name

Nice work.

As far as I can see this library doesn't implement the ratified AMQP version (1.0) but instead the legacy version (0.9.1) that RabbitMQ uses. I suggest renaming the library not to include a mention of AMQP to avoid confusion.

RabbitMQ is pretty much the only message broker that still uses the legacy version of the protocol which means it can pretty much be considered a proprietary protocol of RabbitMQ.

provide a default TLS configuration using system default root

currently tls support

  1. user-provided rootCA
  2. fallback to webpki_roots if no root ca provided by user

why to provide a new default config

seems that it is quite common that user want to use system default root with username/password authentication

possible solutions

we provide a default TlsAdaptor when user use amqps URI, the TLS config will use system default root and username/passwork auth.
new dependency : https://crates.io/crates/rustls-native-certs

Implementing Prefetch for a given Queue using AsyncConsumer

Hi there, it's been quite fun to work with your library. Currently, I started wondering if it's possible to use prefetch for a queue that uses basic_consume.

I've tried something along the lines:

       my_channel.basic_qos(
            BasicQosArguments::new(
                0,0,false
            )
        );
        my_channel.channel
            .basic_consume(callback, args)
            .await
            .unwrap();

For this do I need to implement the Blocking consumer instead?

Kind regards,

Dave.

UNEXPECTED_FRAME - expected method frame, got non method frame instead

Having a hard time tracing down the exact issue for reproducability but it happens 100% of the time on my end:

Brandons-MacBook-Air:j2534-0500-poc brandonros 2023-02-04 18:49:02 $ cargo build && RUST_LOG=trace cargo run --bin diag_test
2023-02-04T23:49:07.742795Z  INFO diag_test: calling...    
2023-02-04T23:49:07.847053Z TRACE mio::poll: registering event source with poller: token=Token(2147483649), interests=READABLE    
2023-02-04T23:49:07.848800Z TRACE mio::poll: registering event source with poller: token=Token(0), interests=READABLE | WRITABLE    
2023-02-04T23:49:07.856483Z TRACE amqprs::net::split_connection: 510 bytes read from network
2023-02-04T23:49:07.856902Z TRACE amqprs::net::split_connection: RECV on channel 0: Start(MethodHeader { class_id: 10, method_id: 10 }, Start { version_major: 0, version_minor: 9, server_properties: FieldTable({ShortStr(12, "capabilities"): F(FieldTable({ShortStr(18, "connection.blocked"): t(true), ShortStr(18, "publisher_confirms"): t(true), ShortStr(26, "exchange_exchange_bindings"): t(true), ShortStr(22, "consumer_cancel_notify"): t(true), ShortStr(15, "direct_reply_to"): t(true), ShortStr(10, "basic.nack"): t(true), ShortStr(19, "consumer_priorities"): t(true), ShortStr(28, "authentication_failure_close"): t(true), ShortStr(16, "per_consumer_qos"): t(true)})), ShortStr(12, "cluster_name"): S(LongStr(15, "rabbit@rabbitmq")), ShortStr(9, "copyright"): S(LongStr(55, "Copyright (c) 2007-2023 VMware, Inc. or its affiliates.")), ShortStr(11, "information"): S(LongStr(57, "Licensed under the MPL 2.0. Website: https://rabbitmq.com")), ShortStr(7, "version"): S(LongStr(6, "3.11.8")), ShortStr(8, "platform"): S(LongStr(17, "Erlang/OTP 25.2.2")), ShortStr(7, "product"): S(LongStr(8, "RabbitMQ"))}), mechanisms: LongStr(14, "PLAIN AMQPLAIN"), locales: LongStr(5, "en_US") })
2023-02-04T23:49:07.857103Z TRACE amqprs::net::split_connection: SENT on channel 0: StartOk(MethodHeader { class_id: 10, method_id: 11 }, StartOk { client_properties: FieldTable({ShortStr(8, "platform"): S(LongStr(4, "Rust")), ShortStr(7, "product"): S(LongStr(6, "AMQPRS")), ShortStr(15, "connection_name"): S(LongStr(25, "AMQPRS000@localhost:5672/")), ShortStr(7, "version"): S(LongStr(3, "0.1"))}), machanisms: ShortStr(5, "PLAIN"), response: LongStr(12, "\0guest\0guest"), locale: ShortStr(5, "en_US") })
2023-02-04T23:49:07.858683Z TRACE amqprs::net::split_connection: 20 bytes read from network
2023-02-04T23:49:07.858707Z TRACE amqprs::net::split_connection: RECV on channel 0: Tune(MethodHeader { class_id: 10, method_id: 30 }, Tune { channel_max: 2047, frame_max: 131072, heartbeat: 60 })
2023-02-04T23:49:07.858727Z TRACE amqprs::net::split_connection: SENT on channel 0: TuneOk(MethodHeader { class_id: 10, method_id: 31 }, TuneOk { channel_max: 2047, frame_max: 131072, heartbeat: 60 })
2023-02-04T23:49:07.858769Z TRACE amqprs::net::split_connection: SENT on channel 0: Open(MethodHeader { class_id: 10, method_id: 40 }, Open { virtual_host: ShortStr(1, "/"), capabilities: ShortStr(0, ""), insist: 0 })
2023-02-04T23:49:07.860036Z TRACE amqprs::net::split_connection: 13 bytes read from network
2023-02-04T23:49:07.860061Z TRACE amqprs::net::split_connection: RECV on channel 0: OpenOk(MethodHeader { class_id: 10, method_id: 41 }, OpenOk { know_hosts: ShortStr(0, "") })
2023-02-04T23:49:07.860420Z DEBUG amqprs::net::reader_handler: register channel resource on connection 'AMQPRS000@localhost:5672/ [open]'
2023-02-04T23:49:07.860492Z  INFO amqprs::api::connection: open connection AMQPRS000@localhost:5672/
2023-02-04T23:49:07.860530Z DEBUG amqprs::net::reader_handler: callback registered on connection 'AMQPRS000@localhost:5672/ [open]'
2023-02-04T23:49:07.860554Z DEBUG amqprs::net::reader_handler: register channel resource on connection 'AMQPRS000@localhost:5672/ [open]'
2023-02-04T23:49:07.860640Z TRACE amqprs::net::split_connection: SENT on channel 1: OpenChannel(MethodHeader { class_id: 20, method_id: 10 }, OpenChannel { out_of_band: ShortStr(0, "") })
2023-02-04T23:49:07.860688Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 530203, tv_nsec: 153721375 }
2023-02-04T23:49:07.863449Z TRACE amqprs::net::split_connection: 16 bytes read from network
2023-02-04T23:49:07.863492Z TRACE amqprs::net::split_connection: RECV on channel 1: OpenChannelOk(MethodHeader { class_id: 20, method_id: 11 }, OpenChannelOk { channel_id: LongStr(0, "") })
2023-02-04T23:49:07.863525Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 530233, tv_nsec: 156553958 }
2023-02-04T23:49:07.863609Z  INFO amqprs::api::connection: open channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2023-02-04T23:49:07.863659Z TRACE amqprs::api::channel::dispatcher: starts up dispatcher task of channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2023-02-04T23:49:07.864928Z DEBUG amqprs::api::channel::dispatcher: callback registered on channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2023-02-04T23:49:07.865035Z TRACE amqprs::net::split_connection: SENT on channel 1: DeclareQueue(MethodHeader { class_id: 50, method_id: 10 }, DeclareQueue { ticket: 0, queue: ShortStr(5, "q.req"), bits: 0, arguments: FieldTable({}) })
2023-02-04T23:49:07.865155Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 530203, tv_nsec: 158190625 }
2023-02-04T23:49:07.866775Z TRACE amqprs::net::split_connection: 26 bytes read from network
2023-02-04T23:49:07.866797Z TRACE amqprs::net::split_connection: RECV on channel 1: DeclareQueueOk(MethodHeader { class_id: 50, method_id: 11 }, DeclareQueueOk { queue: ShortStr(5, "q.req"), message_count: 0, consumer_count: 1 })
2023-02-04T23:49:07.866809Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 530233, tv_nsec: 159846833 }
2023-02-04T23:49:07.866881Z TRACE amqprs::net::split_connection: SENT on channel 1: DeclareQueue(MethodHeader { class_id: 50, method_id: 10 }, DeclareQueue { ticket: 0, queue: ShortStr(7, "q.reply"), bits: 0, arguments: FieldTable({}) })
2023-02-04T23:49:07.866905Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 530203, tv_nsec: 159942500 }
2023-02-04T23:49:07.867657Z TRACE amqprs::net::split_connection: 28 bytes read from network
2023-02-04T23:49:07.867680Z TRACE amqprs::net::split_connection: RECV on channel 1: DeclareQueueOk(MethodHeader { class_id: 50, method_id: 11 }, DeclareQueueOk { queue: ShortStr(7, "q.reply"), message_count: 0, consumer_count: 0 })
2023-02-04T23:49:07.867700Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 530233, tv_nsec: 160737416 }
2023-02-04T23:49:07.867786Z TRACE amqprs::net::split_connection: SENT on channel 1: BindQueue(MethodHeader { class_id: 50, method_id: 20 }, BindQueue { ticket: 0, queue: ShortStr(5, "q.req"), exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(5, "q.req"), nowait: false, arguments: FieldTable({}) })
2023-02-04T23:49:07.867830Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 530203, tv_nsec: 160866750 }
2023-02-04T23:49:07.869036Z TRACE amqprs::net::split_connection: 12 bytes read from network
2023-02-04T23:49:07.869052Z TRACE amqprs::net::split_connection: RECV on channel 1: BindQueueOk(MethodHeader { class_id: 50, method_id: 21 }, BindQueueOk)
2023-02-04T23:49:07.869067Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 530233, tv_nsec: 162104000 }
2023-02-04T23:49:07.869137Z TRACE amqprs::net::split_connection: SENT on channel 1: BindQueue(MethodHeader { class_id: 50, method_id: 20 }, BindQueue { ticket: 0, queue: ShortStr(7, "q.reply"), exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(7, "q.reply"), nowait: false, arguments: FieldTable({}) })
2023-02-04T23:49:07.869176Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 530203, tv_nsec: 162212958 }
2023-02-04T23:49:07.870072Z TRACE amqprs::net::split_connection: 12 bytes read from network
2023-02-04T23:49:07.870088Z TRACE amqprs::net::split_connection: RECV on channel 1: BindQueueOk(MethodHeader { class_id: 50, method_id: 21 }, BindQueueOk)
2023-02-04T23:49:07.870103Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 530233, tv_nsec: 163140125 }
2023-02-04T23:49:07.870175Z TRACE amqprs::net::split_connection: SENT on channel 1: Consume(MethodHeader { class_id: 60, method_id: 20 }, Consume { ticket: 0, queue: ShortStr(7, "q.reply"), consumer_tag: ShortStr(14, "reply_consumer"), bits: 0, arguments: FieldTable({}) })
2023-02-04T23:49:07.870213Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 530203, tv_nsec: 163249583 }
2023-02-04T23:49:07.870937Z TRACE amqprs::net::split_connection: 27 bytes read from network
2023-02-04T23:49:07.870959Z TRACE amqprs::net::split_connection: RECV on channel 1: ConsumeOk(MethodHeader { class_id: 60, method_id: 21 }, ConsumeOk { consumer_tag: ShortStr(14, "reply_consumer") })
2023-02-04T23:49:07.870976Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 530233, tv_nsec: 164013041 }
2023-02-04T23:49:07.871100Z TRACE amqprs::api::channel::basic: starts task for async consumer reply_consumer on channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2023-02-04T23:49:07.871123Z  INFO amqprs::api::channel::dispatcher: register consumer reply_consumer
2023-02-04T23:49:07.871150Z TRACE amqprs::net::split_connection: SENT on channel 1: PublishCombo(Publish { ticket: 0, exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(5, "q.req"), bits: 0 }, ContentHeader { common: ContentHeaderCommon { class: 60, weight: 0, body_size: 0 }, basic_properties: BasicProperties { property_flags: [6, 0], content_type: None, content_encoding: None, headers: None, delivery_mode: None, priority: None, correlation_id: Some(ShortStr(36, "f931eeb4-bcb1-4032-98c4-065bed630ef3")), reply_to: Some(ShortStr(7, "q.reply")), expiration: None, message_id: None, timestamp: None, message_type: None, user_id: None, app_id: None, cluster_id: None } }, ContentBody { inner: [] })
2023-02-04T23:49:07.871208Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 530203, tv_nsec: 164244875 }
2023-02-04T23:49:07.873390Z TRACE amqprs::net::split_connection: 89 bytes read from network
2023-02-04T23:49:07.873402Z TRACE amqprs::net::split_connection: RECV on channel 0: Close(MethodHeader { class_id: 10, method_id: 50 }, Close { reply_code: 505, reply_text: ShortStr(70, "UNEXPECTED_FRAME - expected method frame, got non method frame instead"), class_id: 0, method_id: 0 })
2023-02-04T23:49:07.873413Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 530233, tv_nsec: 166451291 }
2023-02-04T23:49:07.873425Z ERROR amqprs::api::callbacks: handle close request for connection 'AMQPRS000@localhost:5672/ [open]', cause: '505: UNEXPECTED_FRAME - expected method frame, got non method frame instead', (class_id = 0, method_id = 0)
2023-02-04T23:49:07.873436Z  INFO amqprs::net::reader_handler: server requests to shutdown connection 'AMQPRS000@localhost:5672/ [closed]'
2023-02-04T23:49:07.873449Z TRACE amqprs::net::split_connection: SENT on channel 0: CloseOk(MethodHeader { class_id: 10, method_id: 51 }, CloseOk)
2023-02-04T23:49:07.873466Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [closed]' heartbeat deadline is updated to Instant { tv_sec: 530203, tv_nsec: 166503541 }
2023-02-04T23:49:07.873501Z  INFO amqprs::net::reader_handler: connection 'AMQPRS000@localhost:5672/ [closed]' is closed, shutting down socket I/O handlers
2023-02-04T23:49:07.873523Z DEBUG amqprs::api::channel::dispatcher: dispatcher mpsc channel closed, channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [closed]'
2023-02-04T23:49:07.873532Z  INFO amqprs::api::channel::dispatcher: exit dispatcher of channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [closed]'
2023-02-04T23:49:07.873552Z  INFO amqprs::net::writer_handler: received shutdown notification for connection 'AMQPRS000@localhost:5672/ [closed]'
2023-02-04T23:49:07.873565Z DEBUG amqprs::api::channel::basic: exit task of async consumer reply_consumer
2023-02-04T23:49:07.873607Z TRACE mio::poll: deregistering event source from poller    

BasicConsumeArguments: rename no_ack since it is confusing

While no_ack is indeed a protocol-level field name, it is confusing. For basic.consume in the API, other clients have generally adopted "auto ack" for the name (or "manual ack", which I like even more).

So I suggest that the field is renamed to auto_ack and a convenience method (default) is introduced for both modes.

Happy to work on a PR if it would be accepted.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.