Git Product home page Git Product logo

Comments (15)

fpagliughi avatar fpagliughi commented on August 18, 2024 2

Done

from paho.mqtt.cpp.

fpagliughi avatar fpagliughi commented on August 18, 2024 1

Agreed. I like that idea. I've been using a lot of AMQP/RabbitMQ lately and like how the client can do the setup and then just loop on incoming messages.

I have also been thinking of providing an easy way to receive a single message, such as the last one received for a retained value. Often I want to grab something like the last temperature value read from a sensor, and have to do all the work to create a connection, subscription, callback, and then tear it all down... just to read a single value.

from paho.mqtt.cpp.

fpagliughi avatar fpagliughi commented on August 18, 2024 1

I just pushed an initial/experimental implementation to the 'develop' branch. Please let me know what you think. It would work something like this:

    cli.connect(connOpts);
    cli.subscribe(TOPIC, QOS);

    string top;
    mqtt::const_message_ptr msg;

    cli.start_consuming();

    while (true) {
        std::tie(top, msg) = cli.consume_message();
        if (!msg) break;
        cout << top << ": " << msg->to_string() << endl;
    }

    cli.stop_consuming();
    cli.disconnect();

I borrowed the "consume" terminology from AMQP. Some points:

  • The start_consuming() call sets up an callback and queue both of which are internal to the client.
  • The blocking receiver consume_message() returns a tuple with the topic string and a shared pointer to the message.
  • There are other receiver calls with timeouts: bool try_consume_message_for(*val, relTime), etc.
  • The stop_consuming() call removes the internal callback and queue.

This is actually implemented in, and available to users of the async_client, and exported by the sync_client. It might be useful to have an asynchronous publisher in one thread and a synchronous consumer in another thread, both sharing a single connection.

It's not fully thread-safe yet, and I may want to fiddle with the return value. Is a tuple the best thing? Why isn't the topic part of the meta-data in the message, in which case, returning a message would suffice.

But I think this is a good start.

from paho.mqtt.cpp.

fpagliughi avatar fpagliughi commented on August 18, 2024 1

OK. I will look, but as I mentioned this is an experimental look at the API. It will settle it down over the next week (probably with the "envelope" class), and then I will harden and debug it - and make sure to fix the race conditions.

from paho.mqtt.cpp.

fpagliughi avatar fpagliughi commented on August 18, 2024 1

Actually now I'm thinking that maybe the consuming queue should be the default behavior for the synchronous client. So the internal queue would be created when you connect, and removed on a disconnect. So the sync consumer would shrink further to:

    cli.connect(connOpts);
    cli.subscribe(TOPIC, QOS);

    while (true) {
        auto msg = cli.consume_message();
        if (!msg) break;
        cout << msg->get_topic() << ": " << msg->to_string() << endl;
    }

    cli.disconnect();

from paho.mqtt.cpp.

andre-vm avatar andre-vm commented on August 18, 2024

Nice job! I think that's a much better way to get started with paho.mqtt.cpp. I didn't have time yet to test it myself, but I intend to do it as soon as possible. I am not very proficient in C++, so I struggled a little to understand how the returned tuple works. I don't think that's a problem, but I'd rather receive the topic and the payload using pointers or references passed as arguments. Speaking of topic and payload, I think it's very interesting your idea of putting both of them into a single structure that represents the whole message.

Now, something that just crossed my mind... In one side, we have an asynchronous client that contains some blocking methods. In the other, we have a synchronous client that supports some callbacks. I think these two are getting a little mixed, somehow. Maybe there should be only one client that supports both ways of working. As you pointed out, it might be useful to have a single connection that can be used sometimes synchronously, sometimes asynchronously. Or maybe we could create a synchronous "mirror" of an asynchronous client, and use the same connection of the later to work in a synchronous way. I know this is a whole new issue and it may have a huge impact in the project, perhaps not only in the C++ version, so don't take it very seriously.

from paho.mqtt.cpp.

fpagliughi avatar fpagliughi commented on August 18, 2024

The tuple is fairly new in C++ and not used very much yet, but I think it will become more common. Tuples are now a basic type in many of the newer programming languages, and I'm guessing that will influence C++ practices. But really, this was an implementation issue in that internally I'm using a thread-safe queue, and each message/topic pair needed to be wrapped into a single moveable value that could be placed in the queue. A tuple served that purpose nicely.

And I'm guessing that I will just put the topic into the message, so this way will have the least change if I do so.

As for the clients, they appear to be getting mixed because they are really the same thing. The client class is a very thin wrapper around the async_client. The other original Paho libraries (C & Java) made a big distinction between the synchronous and asynchronous interfaces, including totally separate implementations for them, but I never saw the distinction other than convenience.

So having both synchronous and asynchronous interfaces to the same connection would be trivial: a single getter added to the client class:

    async_client& get_async_iface() { return cli_; }

I'll think on whether that's a good idea.

from paho.mqtt.cpp.

fpagliughi avatar fpagliughi commented on August 18, 2024

Actually... Looking over at the API's for some AMQP libraries, often the messages and meta-data are separate, especially for outgoing messages, but commonly wrapped in an "envelope" upon arrival. I think I kind of like that idea.

So sending messages would still be like:

void client::publish(topic, const_message_ptr)

But the incoming messages would arrive in an envelope, like:

const_envelope_ptr  client::consume_message();

where the envelope would be a class with members like:

class envelope {
 public:
    const_message_ptr get_message() const;
    string get_topic() const;
    int get_message_id() const;
};

Yeah. I think I like that. It's pretty simple, doesn't much change the existing API, creates a single item for the internal queue, and presents a future path for the additional meta-data that will be coming in new versions of MQTT.

Plus it solves the nagging issue I've had regarding what to do with the message ID for incoming items.

from paho.mqtt.cpp.

andre-vm avatar andre-vm commented on August 18, 2024

I agree, I think it's better to pass the topic and payload separately when publishing. I would just change the names: message becomes payload, and envelope becomes message. But I know, that may cause some trouble for people who already use the previous versions of paho.mqtt.cpp.

I also like your idea of get_async_iface(). That way, the callbacks could be removed from the synchronous client and the blocking methods could be hidden from the asynchronous one.

About the tuples, I think they make total sense in Python, for example. But in C++, at first, I got the impression that the consume_message() wouldn't have access to the top and msg variables. Then I made a little research, which reminded me about the = operator overload. :)

from paho.mqtt.cpp.

andre-vm avatar andre-vm commented on August 18, 2024

I just tested, and I'm getting an exception when stop_consumig() gets called. This is my test code:

#include <iostream>
#include <string>
#include <mqtt/client.h>


int main(void)
{
    mqtt::client client(BROKER, "SynchronousClient");

    mqtt::connect_options connOpts;
    connOpts.set_user_name(USERNAME);
    connOpts.set_password(PASSWORD);
    connOpts.set_clean_session(true);

    try {
        client.connect(connOpts);
        client.subscribe("paho/sync/test");
        client.start_consuming();

        mqtt::message_ptr pubmsg = mqtt::make_message("Test message.");
        client.publish("paho/sync/test", pubmsg);

        std::string topic;
        mqtt::const_message_ptr consmsg;
        std::tie(topic, consmsg) = client.consume_message();

        if (consmsg) std::cout << topic << ": " << consmsg->to_string() << std::endl;

        std::cout << "Stop consuming..." << std::flush;
        client.stop_consuming();
        std::cout << "OK" << std::endl;

        client.disconnect();
    }
    catch (const mqtt::exception& exc) {
        std::cerr << "Error: " << exc.what() << " [" << exc.get_reason_code() << "]" << std::endl;
        return 1;
    }

    return 0;
}

Here is the output:

paho/sync/test: Test message.
Stop consuming...Error: MQTT exception -1 [-1]

from paho.mqtt.cpp.

fpagliughi avatar fpagliughi commented on August 18, 2024

Funny, your previous comment about renaming envelope and message has put doubt into my mind about the separate envelope idea. Already and mqtt::message is a payload and some meta-data (retain flag, QoS, dup, & msg ID). The distinction is that we're wrapping the Paho C library's MQTTAsync_message struct which chose to keep the topic separate in the API.

But the underlying MQTT specification doesn't make that distinction. The PUBLISH message requires the topic string as much as those other fields. So if we decide that a mqtt::message object should contain everything required to put a message on the wire, then we should include the topic with the object.

But that doesn't mean that we need to change most of the publish() methods in the clients. They can still take the parameters separately and assemble a message object internally.

from paho.mqtt.cpp.

andre-vm avatar andre-vm commented on August 18, 2024

Yeah, I think that works.

from paho.mqtt.cpp.

fpagliughi avatar fpagliughi commented on August 18, 2024

I made the change and pushed it to the develop branch. The mqtt::message object now holds everything for a complete MQTT PUBLISH message, including the topic, so now all that is required to queue up incoming messages is message objects (via const smart pointers). This really seems to make sense, but the change reverberated through the API.

The test code above should now work without the error/exception, but consuming should be changed to:

//std::string topic;
//mqtt::const_message_ptr consmsg;
        
auto msg = client.consume_message();
if (msg) std::cout << msg->get_topic() << ": " << msg->to_string() << std::endl;

from paho.mqtt.cpp.

andre-vm avatar andre-vm commented on August 18, 2024

Great job! It's working nicely.

I would only suggest to make a void client::publish(string_ref top, binary_ref payload) overload, so we can publish a simple string message in just one line of code:

client.publish("any/topic", "Any message.");

Instead of:

mqtt::const_message_ptr msg = mqtt::make_message("any/topic", "Any message.");
client.publish(msg);

Also, message::to_string() is returning only the payload. Now that mqtt::message contains both the topic and payload, message::to_string() could return the two combined, like "<topic>: <payload_to_str>".

from paho.mqtt.cpp.

fpagliughi avatar fpagliughi commented on August 18, 2024

The publish you ask for is already there in async_client.h:

	delivery_token_ptr publish(string_ref topic, binary_ref payload) override {
		return publish(std::move(topic), std::move(payload),
					   message::DFLT_QOS, message::DFLT_RETAINED);
	}

That's a good idea for message::to_string(). I was wondering what to dump out, and whether I should include the other parameters (thus, whether it should be good for debug info, or good for practical display).

And I was considering a string conversion, in case the payload was actually binary.

from paho.mqtt.cpp.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.