Comments (15)
Done
from paho.mqtt.cpp.
Agreed. I like that idea. I've been using a lot of AMQP/RabbitMQ lately and like how the client can do the setup and then just loop on incoming messages.
I have also been thinking of providing an easy way to receive a single message, such as the last one received for a retained value. Often I want to grab something like the last temperature value read from a sensor, and have to do all the work to create a connection, subscription, callback, and then tear it all down... just to read a single value.
from paho.mqtt.cpp.
I just pushed an initial/experimental implementation to the 'develop' branch. Please let me know what you think. It would work something like this:
cli.connect(connOpts);
cli.subscribe(TOPIC, QOS);
string top;
mqtt::const_message_ptr msg;
cli.start_consuming();
while (true) {
std::tie(top, msg) = cli.consume_message();
if (!msg) break;
cout << top << ": " << msg->to_string() << endl;
}
cli.stop_consuming();
cli.disconnect();
I borrowed the "consume" terminology from AMQP. Some points:
- The
start_consuming()
call sets up an callback and queue both of which are internal to the client. - The blocking receiver
consume_message()
returns a tuple with the topic string and a shared pointer to the message. - There are other receiver calls with timeouts:
bool try_consume_message_for(*val, relTime)
, etc. - The
stop_consuming()
call removes the internal callback and queue.
This is actually implemented in, and available to users of the async_client
, and exported by the sync_client
. It might be useful to have an asynchronous publisher in one thread and a synchronous consumer in another thread, both sharing a single connection.
It's not fully thread-safe yet, and I may want to fiddle with the return value. Is a tuple the best thing? Why isn't the topic part of the meta-data in the message, in which case, returning a message would suffice.
But I think this is a good start.
from paho.mqtt.cpp.
OK. I will look, but as I mentioned this is an experimental look at the API. It will settle it down over the next week (probably with the "envelope" class), and then I will harden and debug it - and make sure to fix the race conditions.
from paho.mqtt.cpp.
Actually now I'm thinking that maybe the consuming queue should be the default behavior for the synchronous client. So the internal queue would be created when you connect, and removed on a disconnect. So the sync consumer would shrink further to:
cli.connect(connOpts);
cli.subscribe(TOPIC, QOS);
while (true) {
auto msg = cli.consume_message();
if (!msg) break;
cout << msg->get_topic() << ": " << msg->to_string() << endl;
}
cli.disconnect();
from paho.mqtt.cpp.
Nice job! I think that's a much better way to get started with paho.mqtt.cpp. I didn't have time yet to test it myself, but I intend to do it as soon as possible. I am not very proficient in C++, so I struggled a little to understand how the returned tuple works. I don't think that's a problem, but I'd rather receive the topic and the payload using pointers or references passed as arguments. Speaking of topic and payload, I think it's very interesting your idea of putting both of them into a single structure that represents the whole message.
Now, something that just crossed my mind... In one side, we have an asynchronous client that contains some blocking methods. In the other, we have a synchronous client that supports some callbacks. I think these two are getting a little mixed, somehow. Maybe there should be only one client that supports both ways of working. As you pointed out, it might be useful to have a single connection that can be used sometimes synchronously, sometimes asynchronously. Or maybe we could create a synchronous "mirror" of an asynchronous client, and use the same connection of the later to work in a synchronous way. I know this is a whole new issue and it may have a huge impact in the project, perhaps not only in the C++ version, so don't take it very seriously.
from paho.mqtt.cpp.
The tuple is fairly new in C++ and not used very much yet, but I think it will become more common. Tuples are now a basic type in many of the newer programming languages, and I'm guessing that will influence C++ practices. But really, this was an implementation issue in that internally I'm using a thread-safe queue, and each message/topic pair needed to be wrapped into a single moveable value that could be placed in the queue. A tuple served that purpose nicely.
And I'm guessing that I will just put the topic into the message, so this way will have the least change if I do so.
As for the clients, they appear to be getting mixed because they are really the same thing. The client
class is a very thin wrapper around the async_client
. The other original Paho libraries (C & Java) made a big distinction between the synchronous and asynchronous interfaces, including totally separate implementations for them, but I never saw the distinction other than convenience.
So having both synchronous and asynchronous interfaces to the same connection would be trivial: a single getter added to the client class:
async_client& get_async_iface() { return cli_; }
I'll think on whether that's a good idea.
from paho.mqtt.cpp.
Actually... Looking over at the API's for some AMQP libraries, often the messages and meta-data are separate, especially for outgoing messages, but commonly wrapped in an "envelope" upon arrival. I think I kind of like that idea.
So sending messages would still be like:
void client::publish(topic, const_message_ptr)
But the incoming messages would arrive in an envelope, like:
const_envelope_ptr client::consume_message();
where the envelope would be a class with members like:
class envelope {
public:
const_message_ptr get_message() const;
string get_topic() const;
int get_message_id() const;
};
Yeah. I think I like that. It's pretty simple, doesn't much change the existing API, creates a single item for the internal queue, and presents a future path for the additional meta-data that will be coming in new versions of MQTT.
Plus it solves the nagging issue I've had regarding what to do with the message ID for incoming items.
from paho.mqtt.cpp.
I agree, I think it's better to pass the topic and payload separately when publishing. I would just change the names: message becomes payload, and envelope becomes message. But I know, that may cause some trouble for people who already use the previous versions of paho.mqtt.cpp.
I also like your idea of get_async_iface()
. That way, the callbacks could be removed from the synchronous client and the blocking methods could be hidden from the asynchronous one.
About the tuples, I think they make total sense in Python, for example. But in C++, at first, I got the impression that the consume_message()
wouldn't have access to the top
and msg
variables. Then I made a little research, which reminded me about the = operator overload. :)
from paho.mqtt.cpp.
I just tested, and I'm getting an exception when stop_consumig()
gets called. This is my test code:
#include <iostream>
#include <string>
#include <mqtt/client.h>
int main(void)
{
mqtt::client client(BROKER, "SynchronousClient");
mqtt::connect_options connOpts;
connOpts.set_user_name(USERNAME);
connOpts.set_password(PASSWORD);
connOpts.set_clean_session(true);
try {
client.connect(connOpts);
client.subscribe("paho/sync/test");
client.start_consuming();
mqtt::message_ptr pubmsg = mqtt::make_message("Test message.");
client.publish("paho/sync/test", pubmsg);
std::string topic;
mqtt::const_message_ptr consmsg;
std::tie(topic, consmsg) = client.consume_message();
if (consmsg) std::cout << topic << ": " << consmsg->to_string() << std::endl;
std::cout << "Stop consuming..." << std::flush;
client.stop_consuming();
std::cout << "OK" << std::endl;
client.disconnect();
}
catch (const mqtt::exception& exc) {
std::cerr << "Error: " << exc.what() << " [" << exc.get_reason_code() << "]" << std::endl;
return 1;
}
return 0;
}
Here is the output:
paho/sync/test: Test message.
Stop consuming...Error: MQTT exception -1 [-1]
from paho.mqtt.cpp.
Funny, your previous comment about renaming envelope and message has put doubt into my mind about the separate envelope idea. Already and mqtt::message is a payload and some meta-data (retain flag, QoS, dup, & msg ID). The distinction is that we're wrapping the Paho C library's MQTTAsync_message
struct which chose to keep the topic separate in the API.
But the underlying MQTT specification doesn't make that distinction. The PUBLISH message requires the topic string as much as those other fields. So if we decide that a mqtt::message
object should contain everything required to put a message on the wire, then we should include the topic with the object.
But that doesn't mean that we need to change most of the publish() methods in the clients. They can still take the parameters separately and assemble a message object internally.
from paho.mqtt.cpp.
Yeah, I think that works.
from paho.mqtt.cpp.
I made the change and pushed it to the develop branch. The mqtt::message object now holds everything for a complete MQTT PUBLISH message, including the topic, so now all that is required to queue up incoming messages is message
objects (via const smart pointers). This really seems to make sense, but the change reverberated through the API.
The test code above should now work without the error/exception, but consuming should be changed to:
//std::string topic;
//mqtt::const_message_ptr consmsg;
auto msg = client.consume_message();
if (msg) std::cout << msg->get_topic() << ": " << msg->to_string() << std::endl;
from paho.mqtt.cpp.
Great job! It's working nicely.
I would only suggest to make a void client::publish(string_ref top, binary_ref payload)
overload, so we can publish a simple string message in just one line of code:
client.publish("any/topic", "Any message.");
Instead of:
mqtt::const_message_ptr msg = mqtt::make_message("any/topic", "Any message.");
client.publish(msg);
Also, message::to_string()
is returning only the payload. Now that mqtt::message
contains both the topic and payload, message::to_string()
could return the two combined, like "<topic>: <payload_to_str>"
.
from paho.mqtt.cpp.
The publish you ask for is already there in async_client.h
:
delivery_token_ptr publish(string_ref topic, binary_ref payload) override {
return publish(std::move(topic), std::move(payload),
message::DFLT_QOS, message::DFLT_RETAINED);
}
That's a good idea for message::to_string(). I was wondering what to dump out, and whether I should include the other parameters (thus, whether it should be good for debug info, or good for practical display).
And I was considering a string conversion, in case the payload was actually binary.
from paho.mqtt.cpp.
Related Issues (20)
- Win10+cmake+mingw+mqtt-1.2.0 error: 'mutex' is not a member of 'std' HOT 1
- Shared Subscriptions HOT 2
- Can't create more than 64 client connections HOT 10
- Token class not returning the Error Message details of connectionFailure HOT 1
- connected() not called after reconnect HOT 4
- SSL/TLS HOT 6
- A core dump occurred when the function on_success() was called HOT 6
- How to maximize throughput while publishing HOT 4
- Could NOT find PahoMqttC (missing: PAHO_MQTT_C_LIBRARIES PAHO_MQTT_C_INCLUDE_DIRS) HOT 1
- No consumer notification when broker cleanly disconnects HOT 3
- paho.mqtt.cpp looks for PahoMqttC, but the package named - eclipse-paho-mqtt-c. HOT 2
- CMake Error when PAHO_BUILD_SAMPLES=TRUE (Cannot find src/samples/async_consume_v5.cpp) HOT 1
- Why is this project under Eclipse v1.0 when paho.mqtt.c is under Eclipse v2.0? HOT 1
- Building paho.mqtt.cpp: cmake doesn't pick up my c++ compiler correctly. HOT 4
- Homebrew Installation HOT 1
- Automatic Reconnect Crashes MQTT when `client.reconnect()` is called HOT 7
- Connect failed, more to try HOT 1
- SSL_read() Segmentation fault
- Build with PAHO_WITH_MQTT_C is broken HOT 3
- broker(emqx) already support QUIC, and i Hope client also supports the Quic protocol HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from paho.mqtt.cpp.