Git Product home page Git Product logo

mqttrs's Introduction

Rust Mqtt Encoding & Decoding Crates.io Docs.rs

Mqttrs is a Rust crate (library) to write MQTT protocol clients and servers.

It is a codec-only library with very few dependencies and a straightforward and composable API, usable with rust's standard library or with async frameworks like tokio. It is strict when decoding (e.g. returns an error when encountering reserved values) and encoding (the API makes it impossible to generate an illegal packet).

Mqttrs currently requires Rust >= 1.39 and supports MQTT 3.1.1. Support for MQTT 5 is planned for a future version.

Usage

Add mqttrs = "0.4" and bytes = "1.0" to your Cargo.toml.

use mqttrs::*;
use bytes::BytesMut;

// Allocate write buffer.
let mut buf = BytesMut::with_capacity(1024);

// Encode an MQTT Connect packet.
let pkt = Packet::Connect(Connect { protocol: Protocol::MQTT311,
                                    keep_alive: 30,
                                    client_id: "doc_client".into(),
                                    clean_session: true,
                                    last_will: None,
                                    username: None,
                                    password: None });
assert!(encode(&pkt, &mut buf).is_ok());
assert_eq!(&buf[14..], "doc_client".as_bytes());
let mut encoded = buf.clone();

// Decode one packet. The buffer will advance to the next packet.
assert_eq!(Ok(Some(pkt)), decode(&mut buf));

// Example decode failures.
let mut incomplete = encoded.split_to(10);
assert_eq!(Ok(None), decode(&mut incomplete));
let mut garbage = BytesMut::from(&[0u8,0,0,0] as &[u8]);
assert_eq!(Err(Error::InvalidHeader), decode(&mut garbage));

Optional serde support.

Use mqttrs = { version = "0.4", features = [ "derive" ] } in your Cargo.toml.

Enabling this features adds #[derive(Deserialize, Serialize)] to some mqttrs types. This simplifies storing those structs in a database or file, typically to implement session support (qos, subscriptions...).

This doesn't add mqtt as a serde data format; you still need to use the mqttrs::{decode,encode} functions.

Optional #[no_std] support.

Use mqttrs = { version = "0.4", default-features = false } in your Cargo.toml to remove the default std feature.

Disabling this feature comes with the cost of not implementing the std::error::Error trait, as well as not supporting std::io read and write. This allows usage in embedded devices where the standard library is not available.

mqttrs's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

mqttrs's Issues

Support alloc-free operation

Split off from discussion in #16. The next step after no_std is supporting no_alloc.

I think the key ingredient will be to switch encode()/decode() to use a BufMut/Buf trait exclusively without relying on a BytesMut/Bytes implementor. That would enable no_alloc users to pass a [u8, N] instead of a Bytes, and we can let them deal with buffer_too_small errors.

I've already done a partial switch to the Buf/BufMut traits, but the final bits are tricky. Might be worth checking with upstream why some of the useful functions are implemented on the Struct instead of the trait.

Proper way to extract packet length?

With the new API decode_slice it is a bit unclear what's the best way to the extract packet length?

I guess I could use read_header but then it is called as duplicate inside decode_slice as well.

I generally I shouldn't really need it but if there are multiple packets send with Qos 0, I might end up having more than one packet waiting in my receive queue below socket-layers. In this case, I need to know where packet starts and ends. Adding traditional length: &mut usize into decode_slice (1+remaining length) would work but wanted to understand if there is already some more clever way that I've missed.

PacketIdentifier should be NonZeroU16

The spec disallows pid==0: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718025

I wouldn't be surprised if some servers/clients out there are buggy in that respect, but as one data point, mosquitto client respects this and mosquitto server will close the connection if it receives a pid of 0.

As an optimisation bonus, Option<NonZeroU16> takes only 2 bytes of memory compared to Option<u16>.

This is another API-breaking change, looks like enough to start a 0.2 branch ;)

Unable to implement tokio_util::codec::{Decoder, Encoder} with mqttrs version 0.4

I updated our app to use tokio 1.x the other day, and with that I also updated mqttrs from 0.3 to 0.4. But since Packet now requires a lifetime, I don't see how I can implement tokio_util::codec::{Decoder, Encoder} using this version.

I tried the following code:

use bytes::BytesMut;
use mqttrs::{Error, Packet};
use tokio_util::codec::{Decoder, Encoder};

pub struct MQTTCodec;

impl MQTTCodec {
   pub fn new() -> MQTTCodec {
       MQTTCodec {}
   }
}

impl<'a> Decoder for MQTTCodec {
   type Item<'a> = Packet<'a>;
   type Error = Error;

   fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Packet<'a>>, Self::Error> {
       mqttrs::decode_slice(src)
   }
}

impl Encoder<Packet<'_>> for MQTTCodec {
   type Error = Error;

   fn encode(&mut self, packet: Packet, buf: &mut BytesMut) -> Result<(), Self::Error> {
       mqttrs::encode_slice(&packet, &mut buf)
   }
}

But this gives me:

error[E0658]: generic associated types are unstable
  --> iot-gateway/tests/mqttbroker/codec.rs:14:5
   |
14 |     type Item<'a> = Packet<'a>;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
   = note: see issue #44265 <https://github.com/rust-lang/rust/issues/44265> for more information

error[E0496]: lifetime name `'a` shadows a lifetime name that is already in scope
  --> iot-gateway/tests/mqttbroker/codec.rs:14:15
   |
13 | impl<'a> Decoder for MQTTCodec {
   |      -- first declared here
14 |     type Item<'a> = Packet<'a>;
   |               ^^ lifetime `'a` already in scope

Do you have any suggestions or pointers on how to make this work again with mqttrs 0.4? Thanks!

make every packet clonable.

There might be situations, where we might want to deep copy whole packet.

So, we need to implement clone() for all the packets.

Usage in no_std environments

Would it be possible to allow usage in no_std environments?

bytes already supports disabling std through features, so it seems pretty trivial to add a feature gate to allow usage in no_std?

It seems like the only usage of std is

use std::{
    error::Error as ErrorTrait,
    fmt,
    io::{Error as IoError, ErrorKind},
    num::NonZeroU16,
};

error and fmt can be imported from core instead, io::Error seems to be a convenience implementation of From, that could be just feature gated in a no_std cfg..

The NonZeroU16 I am a bit unsure of, but it should be pretty trivial?

Zero-copy encoding and decoding

It might be possible to do some zero-copy encoding or decoding, especially of the publish payload. Bytes can do some refcounting behind the scene, so if we switch publish.payload from a Vec<u8> to a Bytes we should only have to create the slice, not its content.

We should revise whether encode() and decode() should take an impl IntoBuf and impl IntoBufMut rather than a straight BytesMut.

Last but not least, this all needs to be benchmarked.

Trouble Implementing Tokio-util Decoder for MQTT Broker

I am pretty new to Rust and MQTT, and I was following this series of articles (https://hassamuddin.com/blog/rust-mqtt/overview/) on writing an async MQTT broker in Rust. As this article is a little dated, I was attempting to update it to use the most recent version of tokio-util (v0.6.8) and mqttrs (v0.4.1).

I think I have been able to update most of the code in that example located here (https://github.com/Heasummn/rust-mqtt), but I have run into some trouble trying to implement tokio_util::codec::Decoder. It seems that my problem comes from the lifetime attached to the Packet enum.

I was wondering if there was a potential workaround for this problem. Something to make the Decoder happy so I can see if everything else is working as it should be. Please let me know if you need any more information from me.

Decoding an incomplete packet can still consume some bytes of the buffer.

Correct me if I'm wrong, but mqttrs::decoder::decode() follows the tokio FramedRead API so that you can write

use tokio::codec::Decoder;
pub struct Codec;
impl Decoder for Codec {
    type Item = Packet;
    type Error = Error;
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        mqttrs::decoder::decode(src)
    }
}

which means that if the packet is incomplete it should return Ok(None) and not touch the buffer.

This is not currently the case, as the buffer advances if there's at least a header in it.

The fix is pretty trivial and you can cherry-pick it from vincentdephily@af590ce

Cheers.

`mqttrs::PacketIdentifier` should `#[derive(Hash)]`

It's very natural to implement QoS by using PacketIdentifier as an std::collections::HashMap key, but for that to work the type must implement Hash.

Workarounds are simple enough (use a BTreeMap, or use a u16 key and insert/get/remove PacketIdentifier.0), but having Hash would improve the quality of life.

Serde feature enabled by default (through `std` feature)

The serde feature is active by default because the std feature depends on it:

mqttrs/Cargo.toml

Lines 17 to 22 in 17cc76e

[features]
default = ["std"]
# Implements serde::{Serialize,Deserialize} on mqttrs::Pid.
derive = ["serde"]
std = ["bytes/std", "serde/std"]

This makes it impossible to build the library with only the std feature, but not the serde feature. Also, the Readme states that you have to add the derive feature in order to use serde, which is not required.

Ideally, we would want to enable the serde/std feature only if both the std and derive features are active, but I don't know if that's possible.

(This is not a real problem for me at the moment, I just though that this looks like a bug, so I decided to report it. Thanks for creating this library, it looks very promising from a first look!)

Add builder APIs

Currently, Packet construction can only be done by filling every field, which is neat but a bit wordy:

let pkt = Connect { protocol: Protocol::MQTT311,
                        keep_alive: 60,
                        client_id: "foo".into(),
                        clean_session: true,
                        last_will: None,
                        username: None,
                        password: None };

It'd be nice to have [#derive(Default)] or impl fn new() -> Self and a full set of fn set_clientid(self, String) -> Self.

Just need to make sure that packets have reasonable defaults (provide a new() with arguments otherwise) and that using these API don't cause extra allocations.

It might also be a good time to get a closer look at MQTT5 to see how mqttrs will transistion to supporting both.

Example in README file does not work, cannot find encode/decode in this scope

Apologies if I am missing something obvious. I am a rust newbie and and am trying to get a simple mqtt POC working.

I have this in my Cargo.toml file

[dependencies]
mqttrs = { version = "0.4", features = [ "derive" ] }
bytes = "1.0"

and this is my main file (copied verbatim from the README.md file)

use bytes::BytesMut;
use mqttrs::*;

fn main() {
    // Allocate write buffer.
    let mut buf = BytesMut::with_capacity(1024);

    // Encode an MQTT Connect packet.
    let pkt = Packet::Connect(Connect {
        protocol: Protocol::MQTT311,
        keep_alive: 30,
        client_id: "doc_client".into(),
        clean_session: true,
        last_will: None,
        username: None,
        password: None,
    });
    assert!(encode(&pkt, &mut buf).is_ok());
    assert_eq!(&buf[14..], "doc_client".as_bytes());
    let mut encoded = buf.clone();

    // Decode one packet. The buffer will advance to the next packet.
    assert_eq!(Ok(Some(pkt)), decode(&mut buf));

    // Example decode failures.
    let mut incomplete = encoded.split_to(10);
    assert_eq!(Ok(None), decode(&mut incomplete));
    let mut garbage = BytesMut::from(&[0u8, 0, 0, 0] as &[u8]);
    assert_eq!(Err(Error::InvalidHeader), decode(&mut garbage));
}

And the compiler complains that the encode and decode functions do not exist...

error[E0425]: cannot find function `encode` in this scope
  --> src/main.rs:18:13
   |
18 |     assert!(encode(&pkt, &mut buf).is_ok());
   |             ^^^^^^ not found in this scope

error[E0425]: cannot find function `decode` in this scope
  --> src/main.rs:23:31
   |
23 |     assert_eq!(Ok(Some(pkt)), decode(&mut buf));
   |                               ^^^^^^ not found in this scope
   |
help: consider importing this function
   |
1  | use core::num::flt2dec::decode;
   |

error[E0425]: cannot find function `decode` in this scope
  --> src/main.rs:27:26
   |
27 |     assert_eq!(Ok(None), decode(&mut incomplete));
   |                          ^^^^^^ not found in this scope
   |
help: consider importing this function
   |
1  | use core::num::flt2dec::decode;
   |

error[E0425]: cannot find function `decode` in this scope
  --> src/main.rs:29:43
   |
29 |     assert_eq!(Err(Error::InvalidHeader), decode(&mut garbage));
   |                                           ^^^^^^ not found in this scope
   |
help: consider importing this function
   |
1  | use core::num::flt2dec::decode;
   |

error: aborting due to 4 previous errors

I have tried importing those functions directly in the use statement, but they do not seem to exist (compiler suggested encoder and decoder, however they were private so that provided more errors). I have also tried removing the serde feature in the Cargo.toml file to no effect.

Thanks.

Refactor using a combined `QosPid` struct.

Related to bug #6, but deserves to be treated separately as it is an API change instead of a bugfix:

In my own code I've found it useful to have both a

pub enum Qos {
    AtMostOnce,
    AtLeastOnce,
    ExactlyOnce,
}

and a

pub enum QosPid {
    AtMostOnce,
    AtLeastOnce(Pid),
    ExactlyOnce(Pid),
}

The QosPid struct would have made bug #6 more obvious, and would prevent similar bugs in user code (where it's easy to not have qos and pid in sync).

I suggest to do the same in this lib, replace the two fields qos:QoS, pid:Option<PacketIdentifier> with a single qospid: QosPid as defined above. This goes towards the "hard to misuse" goal of good APi design (and makes the struct smaller as a bonus).

Like #10, this is a breaking change. Hopefully we can lump a few together.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.