mqttjs / async-mqtt Goto Github PK
View Code? Open in Web Editor NEWPromise wrapper over MQTT.js
License: MIT License
Promise wrapper over MQTT.js
License: MIT License
Hi,
I am trying to connect to my broker running at _mqtt._tcp.iot.domain.com
but it is failing with
(node:296427) UnhandledPromiseRejectionWarning: Error: getaddrinfo ENOTFOUND _mqtt._tcp.iot.domain.com
I verified with a dig that I am able to get the SRV record.
Hey,
i tried to check if the connection is established. So in the synchronous MQTT there is a way with
const mqtt = require('async-mqtt);
client = mqtt.connect(adress);
client.stream.on('error',/*error handler*/);
To check the stream. But when i try the same in the async i only get a error tath the stream is undefiened.
Hi! how can I get message from mqtt in asynchronous way?
Does this library work with authentication?
For example, https://www.npmjs.com/package/mqtt lets you login to an MQTT broker in the connection statement.
We are setting up permissions for all of our MQTT users and want to test out if the user can't really access certain queues without the correct permission. But when I used client.publish(topic, payload, {qos: 1})
it just tries to keep sending it.
When I changed it to {qos: 0}
, it fired and forgot. I also tried wrapping it in a try-catch statement, but I'm not catching anything.
Please advise.
Is it possible to pass the certificates contents instead of passing the filepaths.
Am I crazy or this file is missing?
What is your idea of converting the connect
method to a promise too?
For example, connecting would become something like this:
let mqtt = await MQTT.connect('tcp://somehost.com:1883')
Issuing an
await client.publish immediately followed by
await client,.end() results in:
the client.on(error) function to be triggered which reports:
Error: read ECONNRESET
at TLSWrap.onStreamRead (node:internal/stream_base_commons:217:20)
at TLSWrap.callbackTrampoline (node:internal/async_hooks:130:17)
If I revert back to non tls using port 1883 then it works fine to completion.
Running Windows 10 Enterprise
Node.js 16.15.0
mosquitto 2.0.14
In my webapp I am building an mqtt adapter that gets initialized during the app startup process in the base route lifecycle function:
ApplicationRoute() {
async routeLifeCycleBegin() {
await this.mqttAdapter.connect();
}
}
import mqtt from 'async-mqtt';
export default class mqttAdapter {
async connect() {
...
const connection = await mqtt.connectAsync('wss://iot-address/mqtt', options);
}
}
For some reason I keep getting TypeError: WS is not a constructor
UNLESS I remove the await
from the above await this.mqttAdapter.connect()
in the route lifecycle function. Why would this be? I wanted to resolve the connection before proceeding and loading the app which is why I did await this.mqttAdapter.connect()
. I have installed with npm install -D async-mqtt
TypeError: WS is not a constructor
at WebSocketStream (webpack://__ember_auto_import__/./node_modules/websocket-stream/stream.js?:63:16)
at createWebSocket (webpack://__ember_auto_import__/./node_modules/mqtt/lib/connect/ws.js?:55:10)
at Object.buildBuilderBrowser (webpack://__ember_auto_import__/./node_modules/mqtt/lib/connect/ws.js?:83:10)
at MqttClient.wrapper [as streamBuilder] (webpack://__ember_auto_import__/./node_modules/mqtt/lib/connect/index.js?:144:36)
at MqttClient._setupStream (webpack://__ember_auto_import__/./node_modules/mqtt/lib/client.js?:271:22)
at new MqttClient (webpack://__ember_auto_import__/./node_modules/mqtt/lib/client.js?:251:8)
at Function.connect (webpack://__ember_auto_import__/./node_modules/mqtt/lib/connect/index.js?:147:10)
at Object.connectAsync (webpack://__ember_auto_import__/./node_modules/async-mqtt/index.js?:142:25)
I connected to a mqtt server and subscribe a topic. The client is hanging if I don't use mqtt.end() but even if I use or don't use mqtt.end(), the client only receive message once. Is it possible to make client keep listening and keep receiving message from server? I would be appreciated if you can help. Thank you in advance!
Hi everyone,
I made a small wrapper around this client to support a clean request/response syntax with promise support.
It gives every request a unique id (using ULID) and uses it to associate request messages with response messages via one-shot topics.
This is the syntax:
// request topic will be "my/topic/request/123456"
sendRequest("my/topic", "Hello!").then((res) => {
/*
res ={
topic: ""my/topic/response/123456",
payload: Buffer()
}
*/
}, (err) => console.error)
Example topics:
Would it be interesting to include in this respository? Otherwise I'll just make a wrapper on my own but I don't want to segment the userbase just for a single feature like this.
If interested I'd love to hear your suggestions/change you'd make!
MQTTjs v5 added support for promises, I don't know if this library will be needed anymore
Hi! I'm using async-mqtt.
I've noticed that the current implementation is not working with React apps.
The solution for this is described in issue #1412.
Could we implement this also in async-mqtt?
The method connectAsync
is missing in the index.d.ts file. Could you please add it?
How do I get the connection status of a client? The following returns undefined
.
const MQTT = require('async-mqtt')
const uuidv1 = require('uuid/v1')
//
var client = MQTT.connect('tcp://localhost:1883', { clientId: uuidv1() })
client.on('connect', () => {
console.log(client.connected)
})
client.on('reconnect', () => {
console.log('Reconnected.')
})
client.on('close', () => {
console.log('Closed.')
})
This client._client.connected
returns true
, however.
client/index.js
const mqtt = require('async-mqtt')
//
function mqttClient(config, callbacks) {
let protocol = config.protocol
let host = config.host
let port = config.port
let connOpts = config.connOpts
let brokerUrl = protocol + '://' + host + ':' + port
//
try {
// Connect is not async - https://github.com/mqttjs/async-mqtt/issues/4
let mqttClient = mqtt.connect(brokerUrl, connOpts)
let handlers = Object.keys(callbacks)
handlers.forEach((h, i) => {
mqttClient.on(h, callbacks[h])
})
} catch (e) {
}
}
index.js
return new Promise((resolve, reject) => {
// hndlr required from another file that has implementations of various events.
const callbacks = {
connect: hndlr.connHandler,
reconnect: hndlr.reconnHandler,
close: hndlr.closeHandler,
offline: hndlr.offlineHandler,
error: hndlr.connErrorHandler,
end: hndlr.endHandler
}
//
try {
client.mqttClient(mqttOpts, callbacks)
resolve()
} catch (e) {
reject(e)
}
})
When I run the index.js
, I keep getting message of reconnect and close. Firstly, I am not sure, why a reconnect is even made - I don't need it in my case. Secondly, why don't I get callback on the connect
event?
Hello,
I've encountered an issue associated with the loss of internet connection during publishing data.
Event if there is no Internet connection, publish method does not throw or block - it simply acts, like the data was send successfully.
Is this behavior intentional?
If so, is there a possibility to check whether publish was send to the server?
I would like to store this data and send them after the return of Internet connection.
Below you will find my code for testing:
const mqtt = require("async-mqtt");
const credentials = require("./credentials.json");
const connectionURL = "presiot.mciotextension.eu1.mindsphere.io";
const connectionOptions = {
port: 1883,
clientId: "mqttjs_87654324",
username: "presiot/" + credentials.username,
password: credentials.password,
device_name: "mqttjs_87654324",
tenant: "presiot",
protocol: "mqtt/tcp",
host: "mciotextension.eu1.mindsphere.io",
};
const snooze = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
/**
* @description Method for generating command with random value
*/
let generateCommandWithRandValue = () => {
//Generating value and timestamp to send
let value = Math.round(Math.random() * 1000000) / 100;
let timestamp = Date.now();
return [
200,
"variableGroup",
"variableName",
value.toFixed(2),
"V",
new Date(timestamp).toISOString(),
].join(",");
};
let exec = async () => {
//Connecting
let client = await mqtt.connectAsync(connectionURL, connectionOptions);
while (true) {
try {
//Waiting 1s
await snooze(1000);
//Generating command to send
let command = generateCommandWithRandValue();
//BREAKING INTERNET CONNECTION HERE
//Sending value to mqtt broker
await client.publish("s/us", command);
//EVEN IF THERE IS NO INTERNET PUBLISH DOES NOT THROW OR BLOCK - COMMAND IS TREATED AS SEND PROPERLY
console.log(`Command send properly: ${command}`);
} catch (err) {
console.log(err);
}
}
};
exec();
As of now, async-mqtt
only transforms the sync client to async.
The mqtt.js
package has a Store
class that relies on callbacks. In my opinion, this would also be in the scope of this package.
Do you have any plans on implementing this? I can give a hand, if needed.
I would like to set a hook for shutdown, something like this:
process.on('exit', () => { mqttConnection.end(true).then(()=>{}).catch(()=>{});
This... never works. process.exit(1)
does, the process isn't hung, it just never gets the idea that the node event loop is empty. Is there a way to make it so the event loop can empty when an MQTT connection is available? It may already be possible, I just don't know how.
Currently async-mqtt support 2+.
"dependencies": {
"mqtt": "^2.3.1"
},
mqtt 3+ has been out for month anything in the works for async-mqtt to support that release? I haven't looked but it may not need anything more than to bump the version in package.json?
Hey just wondering if we can get a new release with the latest v4.1.0 of mqtt? The latest release is behind and using 2.5.0, it'd be nice if they were in sync.
Thanks!
Since Async Iteration is on the cusp of being available everywhere, the following dream code could become a reality:
async function (client) {
await client.subscribe("some/+/topic");
for await (let [topic, payload] of client) {
await saveToDB(topic, payload);
}
}
I propose we detect whether the environment has a Symbol.asyncIterator
defined, and it it does, define a method for making the client async-iterable.
Hello Fellas!
In my current project, I'm using this call:
await this.mqtt_client.publish(topic,JSON.stringify(message),{
qos: 2,
retain: false,
dup: false,
});
But received message on broker has QoS 1, what's is wrong in my use?
Hello,
I used your package in one of my app and I noticed that after a time I got ECONNRESET
error, but it is not caught in the client.on('error',..)
method. As I found out in the in the mqttjs package it is already implemented that, the following TLS errors are emit the error event:
Could you please also implement this in this package too?
Thanks,
When you try execute connectAsync()
, the removePromiseResolutionListeners()
function try to execute:
client.off(eventName, promiseResolutionListeners[eventName]);
And this client.off
function doesn't exists on the MqttClient.propotype
So this connectAsync
function crash.
Shouldn't it be removeListener
instead of off
?
Async is extending
publish *
subscribe
unsubscribe
end
But only publish seems to have an example in the readme. Would it be possible to show examples of the other 3 as well? It's not so easy to immediately jump on the wagon first time...
Dependabot is throwing an error in my project from async-mqtt due to a security vunlerability in a dependency:
[email protected] requires glob-parent@^3.1.0 via a transitive dependency on [email protected]
The earliest fixed version is 5.1.2.
Hi,
The following code below permanently blocks when using connectAsync and the MQTT server is not running.
The catch handler is not being triggered. How do I correctly use connectAsync
so that after a connect timeout period an error is caught and notified to the user??
I am calling connectAsync with the following arguments:
/**
* Publish a message to the mqtt broker for a given topic
* Disposes the client connection upon send success or failure.
* @param {string} topic - topic to publish to
* @param {Object} message - message to publish
*/
async publish(topic, message) {
let client = undefined;
try {
console.log('MQTTClient awaiting connection to broker for publishing...');
client = await mqtt.connectAsync(this.#url, this.#options);
console.log('MQTTClient acquired connection, publishing...');
await client.publish(topic, message, this.#options);
console.log(`Published message to topic := ${topic}`);
} catch(e) {
console.error(`Failed to publish message to topic := ${topic}`);
console.error(`message := ${e.message}`);
console.error(`stack := ${e.stack}`);
}
finally {
console.log('MQTTClient closing connection');
if(client)
await client.end();
}
}
Edit: Ahhh just looked at the source code. If I include an additional argument allowRetries
to false then the catch handler triggers. So, if the allowRetries
argument is not included then the default behaviour is that the client will continuously try to connect and code will block permanently until it is reconnected? Is my understanding correct?
Is there any option to specify the number of retries before a connection is deemed to have failed?
This is the 2nd feature I would like to discuss adding to the library.
Which is also a feature request for an excellent raw websockets library.
The feature is described here
Basically, it allows promises to be used to await the next message received, rather than just relying on a callback.
Example from linked issue:
await ws.open(); while (true) { data = await ws.recv(); ws.send(data); }
See discussion in raw websockets project for more details: here
I make try to implement this, but I am not a fantastic JS, nor promises programmer, but if someone else
wants to help or try to do this, please let me know!
Thanks for this awesome async/MQTT library!
I am really excited about this library, and there is one or two features I would like to discuss with the maintainers.
The main feature is JSON based RPC, this allows request/response type messaging to be used with promises.
This isn't my idea, it comes from here:
RPC for async Websockets library
(with thanks to Vitalets for such a brilliant idea!)
JSON RPC Feature found in
websocket-as-promised provides simple request-response mechanism (JSON RPC).
Method.sendRequest()
sends message with uniquerequestId
and returns promise.
That promise get resolved when response message with the samerequestId
comes.
For reading/settingrequestId
from/to message there are two functions defined in optionsattachRequestId / extractRequestId
:const wsp = new WebSocketAsPromised(wsUrl, { packMessage: data => JSON.stringify(data), unpackMessage: data => JSON.parse(data), attachRequestId: (data, requestId) => Object.assign({id: requestId}, data), // attach requestId to message as `id` field extractRequestId: data => data && data.id, // read requestId from message `id` field }); wsp.open() .then(() => wsp.sendRequest({foo: 'bar'})) // actually sends {foo: 'bar', id: 'xxx'}, because `attachRequestId` defined above .then(response => console.log(response)); // waits server message with corresponding requestId: {id: 'xxx', ...}By default
requestId
value is auto-generated, but you can set it manually:wsp.sendRequest({foo: 'bar'}, {requestId: 42});
I will take a stab at implementing this, but I would also invite more experienced JS developers to speak up if they would like to implement it!
I started to see in logs following warning, but it's unclear what mat cause it. Do you have any ideas?
at Writable.writable._write (/home/node/app/packer/node_modules/mqtt/lib/client.js:302:5)
2019-12-27 20:34:20
at work (/home/node/app/packer/node_modules/mqtt/lib/client.js:292:12)
2019-12-27 20:34:20
at MqttClient._handlePacket (/home/node/app/packer/node_modules/mqtt/lib/client.js:336:12)
2019-12-27 20:34:20
at MqttClient._handlePublish (/home/node/app/packer/node_modules/mqtt/lib/client.js:987:12)
2019-12-27 20:34:20
at MqttClient.emit (events.js:189:13)
2019-12-27 20:34:20
at MqttClient.onMessage (/home/node/app/packer/pipes/eventsToRecords.js:68:16)
2019-12-27 20:34:20
at AsyncClient.publish (/home/node/app/packer/node_modules/async-mqtt/index.js:25:12)
2019-12-27 20:34:20
at new Promise (<anonymous>)
2019-12-27 20:34:20
at Promise (/home/node/app/packer/node_modules/async-mqtt/index.js:26:31)
2019-12-27 20:34:20
at MqttClient.publish (/home/node/app/packer/node_modules/mqtt/lib/client.js:431:12)
2019-12-27 20:34:20
at MqttClient._sendPacket (/home/node/app/packer/node_modules/mqtt/lib/client.js:841:7)
2019-12-27 20:34:20
at sendPacket (/home/node/app/packer/node_modules/mqtt/lib/client.js:40:19)
2019-12-27 20:34:20
at TLSSocket.once (events.js:292:8)
2019-12-27 20:34:20
at TLSSocket.Readable.on (_stream_readable.js:822:35)
2019-12-27 20:34:20
at TLSSocket.addListener (events.js:263:10)
2019-12-27 20:34:20
at _addListener (events.js:247:17)
2019-12-27 20:34:20
(node:27) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 1001 drain listeners added. Use emitter.setMaxListeners() to increase limit
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.