Git Product home page Git Product logo

async-mqtt's People

Contributors

bdchauvette avatar fernando-almeida avatar flodan avatar golopot avatar matteodisabatino avatar mcollina avatar miguelrodlav avatar odino avatar okofish avatar palatinb avatar pcowgill avatar rangermauve avatar tabrizian avatar tomotoes avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

async-mqtt's Issues

How to use an broker SRV record to connect?

Hi,

I am trying to connect to my broker running at _mqtt._tcp.iot.domain.com but it is failing with

(node:296427) UnhandledPromiseRejectionWarning: Error: getaddrinfo ENOTFOUND _mqtt._tcp.iot.domain.com

I verified with a dig that I am able to get the SRV record.

Client.stream is undefined

Hey,
i tried to check if the connection is established. So in the synchronous MQTT there is a way with

const mqtt = require('async-mqtt);
client = mqtt.connect(adress);
client.stream.on('error',/*error handler*/);

To check the stream. But when i try the same in the async i only get a error tath the stream is undefiened.

How to catch errors like `access_refused` when using `client.publish`

We are setting up permissions for all of our MQTT users and want to test out if the user can't really access certain queues without the correct permission. But when I used client.publish(topic, payload, {qos: 1}) it just tries to keep sending it.

When I changed it to {qos: 0}, it fired and forgot. I also tried wrapping it in a try-catch statement, but I'm not catching anything.

Please advise.

publish options not working

Hi,

It seems like the publish options aren't working:

const options = { retain: true, qos: 1, dup: false };
client.publish(topic, JSON.stringify(message), options);

gives this as result in MQTT BOX
image

as you can see retain: false and qos: 0

Converting connect method to promise

What is your idea of converting the connect method to a promise too?
For example, connecting would become something like this:

let mqtt = await MQTT.connect('tcp://somehost.com:1883')

MQTT with TLS enabled causes a ECONNRESET when issuing await client.end()

Issuing an
await client.publish immediately followed by
await client,.end() results in:
the client.on(error) function to be triggered which reports:
Error: read ECONNRESET
at TLSWrap.onStreamRead (node:internal/stream_base_commons:217:20)
at TLSWrap.callbackTrampoline (node:internal/async_hooks:130:17)

If I revert back to non tls using port 1883 then it works fine to completion.

Running Windows 10 Enterprise
Node.js 16.15.0
mosquitto 2.0.14

TypeError: WS is not a constructor

In my webapp I am building an mqtt adapter that gets initialized during the app startup process in the base route lifecycle function:

ApplicationRoute() {
  async routeLifeCycleBegin() {
     await this.mqttAdapter.connect();
  }
}
import mqtt from 'async-mqtt';

export default class mqttAdapter {
  async connect() {
    ...
    const connection = await mqtt.connectAsync('wss://iot-address/mqtt', options);
  }
}

For some reason I keep getting TypeError: WS is not a constructor UNLESS I remove the await from the above await this.mqttAdapter.connect() in the route lifecycle function. Why would this be? I wanted to resolve the connection before proceeding and loading the app which is why I did await this.mqttAdapter.connect(). I have installed with npm install -D async-mqtt

TypeError: WS is not a constructor
    at WebSocketStream (webpack://__ember_auto_import__/./node_modules/websocket-stream/stream.js?:63:16)
    at createWebSocket (webpack://__ember_auto_import__/./node_modules/mqtt/lib/connect/ws.js?:55:10)
    at Object.buildBuilderBrowser (webpack://__ember_auto_import__/./node_modules/mqtt/lib/connect/ws.js?:83:10)
    at MqttClient.wrapper [as streamBuilder] (webpack://__ember_auto_import__/./node_modules/mqtt/lib/connect/index.js?:144:36)
    at MqttClient._setupStream (webpack://__ember_auto_import__/./node_modules/mqtt/lib/client.js?:271:22)
    at new MqttClient (webpack://__ember_auto_import__/./node_modules/mqtt/lib/client.js?:251:8)
    at Function.connect (webpack://__ember_auto_import__/./node_modules/mqtt/lib/connect/index.js?:147:10)
    at Object.connectAsync (webpack://__ember_auto_import__/./node_modules/async-mqtt/index.js?:142:25)

How to make client keep recieving message after subscribe to the topic?

I connected to a mqtt server and subscribe a topic. The client is hanging if I don't use mqtt.end() but even if I use or don't use mqtt.end(), the client only receive message once. Is it possible to make client keep listening and keep receiving message from server? I would be appreciated if you can help. Thank you in advance!

Question: Request/Response syntax?

Hi everyone,
I made a small wrapper around this client to support a clean request/response syntax with promise support.
It gives every request a unique id (using ULID) and uses it to associate request messages with response messages via one-shot topics.
This is the syntax:

// request topic will be "my/topic/request/123456"
sendRequest("my/topic", "Hello!").then((res) => {
        /*
            res ={
                topic: ""my/topic/response/123456",
                payload: Buffer()
            }
        */
    }, (err) => console.error)

Example topics:

  • Request: mynamespace/request/{id}
  • Response: mynamespace/response/{id}

Question

Would it be interesting to include in this respository? Otherwise I'll just make a wrapper on my own but I don't want to segment the userbase just for a single feature like this.
If interested I'd love to hear your suggestions/change you'd make!

MQTTjs v5

MQTTjs v5 added support for promises, I don't know if this library will be needed anymore

Buffer is not defined

Hi! I'm using async-mqtt.
I've noticed that the current implementation is not working with React apps.
The solution for this is described in issue #1412.

Could we implement this also in async-mqtt?

Connected status of client?

How do I get the connection status of a client? The following returns undefined.

const MQTT = require('async-mqtt')
const uuidv1 = require('uuid/v1')
//
var client = MQTT.connect('tcp://localhost:1883', { clientId: uuidv1() })
client.on('connect', () => {
    console.log(client.connected)
})
client.on('reconnect', () => {
    console.log('Reconnected.')
})
client.on('close', () => {
    console.log('Closed.')
})

This client._client.connected returns true, however.

Reconnect-close loop.

client/index.js

const mqtt = require('async-mqtt')
//
function mqttClient(config, callbacks) {
    let protocol = config.protocol
    let host = config.host
    let port = config.port
    let connOpts = config.connOpts
    let brokerUrl = protocol + '://' + host + ':' + port
    //
    try {
        // Connect is not async - https://github.com/mqttjs/async-mqtt/issues/4
        let mqttClient = mqtt.connect(brokerUrl, connOpts)
        let handlers = Object.keys(callbacks)
        handlers.forEach((h, i) => {
            mqttClient.on(h, callbacks[h])
        })
    } catch (e) {

    }
}

index.js

return new Promise((resolve, reject) => {
// hndlr required from another file that has implementations of various events.
        const callbacks = {
            connect: hndlr.connHandler,
            reconnect: hndlr.reconnHandler,
            close: hndlr.closeHandler,
            offline: hndlr.offlineHandler,
            error: hndlr.connErrorHandler,
            end: hndlr.endHandler
        }
        //
        try {
            client.mqttClient(mqttOpts, callbacks)
            resolve()
        } catch (e) {
            reject(e)
        }
    })

When I run the index.js, I keep getting message of reconnect and close. Firstly, I am not sure, why a reconnect is even made - I don't need it in my case. Secondly, why don't I get callback on the connect event?

publish does not block or throw even if there is no Internet connection

Hello,

I've encountered an issue associated with the loss of internet connection during publishing data.
Event if there is no Internet connection, publish method does not throw or block - it simply acts, like the data was send successfully.
Is this behavior intentional?

If so, is there a possibility to check whether publish was send to the server?
I would like to store this data and send them after the return of Internet connection.

Below you will find my code for testing:

const mqtt = require("async-mqtt");
const credentials = require("./credentials.json");
const connectionURL = "presiot.mciotextension.eu1.mindsphere.io";

const connectionOptions = {
  port: 1883,
  clientId: "mqttjs_87654324",
  username: "presiot/" + credentials.username,
  password: credentials.password,
  device_name: "mqttjs_87654324",
  tenant: "presiot",
  protocol: "mqtt/tcp",
  host: "mciotextension.eu1.mindsphere.io",
};

const snooze = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

/**
 * @description Method for generating command with random value
 */
let generateCommandWithRandValue = () => {
  //Generating value and timestamp to send
  let value = Math.round(Math.random() * 1000000) / 100;
  let timestamp = Date.now();

  return [
    200,
    "variableGroup",
    "variableName",
    value.toFixed(2),
    "V",
    new Date(timestamp).toISOString(),
  ].join(",");
};

let exec = async () => {
  //Connecting
  let client = await mqtt.connectAsync(connectionURL, connectionOptions);

  while (true) {
    try {
      //Waiting 1s
      await snooze(1000);

      //Generating command to send
      let command = generateCommandWithRandValue();

      //BREAKING INTERNET CONNECTION HERE

      //Sending value to mqtt broker
      await client.publish("s/us", command);

      //EVEN IF THERE IS NO INTERNET PUBLISH DOES NOT THROW OR BLOCK - COMMAND IS TREATED AS SEND PROPERLY

      console.log(`Command send properly: ${command}`);
    } catch (err) {
      console.log(err);
    }
  }
};

exec();

Make Store async

As of now, async-mqtt only transforms the sync client to async.
The mqtt.js package has a Store class that relies on callbacks. In my opinion, this would also be in the scope of this package.

Do you have any plans on implementing this? I can give a hand, if needed.

MQTT connection holds nodejs event loop open

I would like to set a hook for shutdown, something like this:

process.on('exit', () => { mqttConnection.end(true).then(()=>{}).catch(()=>{});

This... never works. process.exit(1) does, the process isn't hung, it just never gets the idea that the node event loop is empty. Is there a way to make it so the event loop can empty when an MQTT connection is available? It may already be possible, I just don't know how.

support for mqtt 3+

Currently async-mqtt support 2+.

 "dependencies": {
    "mqtt": "^2.3.1"
},

mqtt 3+ has been out for month anything in the works for async-mqtt to support that release? I haven't looked but it may not need anything more than to bump the version in package.json?

New Release

Hey just wondering if we can get a new release with the latest v4.1.0 of mqtt? The latest release is behind and using 2.5.0, it'd be nice if they were in sync.

Thanks!

Make client Async-Iterable

Since Async Iteration is on the cusp of being available everywhere, the following dream code could become a reality:

async function (client) {
  await client.subscribe("some/+/topic");

  for await (let [topic, payload] of client) {
    await saveToDB(topic, payload);
  }
}

I propose we detect whether the environment has a Symbol.asyncIterator defined, and it it does, define a method for making the client async-iterable.

Call publish with QoS 2 but broker receive with 1.

Hello Fellas!

In my current project, I'm using this call:

        await this.mqtt_client.publish(topic,JSON.stringify(message),{
            qos: 2,
            retain: false,
            dup: false,
            
        });        

But received message on broker has QoS 1, what's is wrong in my use?

TLS error not caught by error event

Hello,

I used your package in one of my app and I noticed that after a time I got ECONNRESET error, but it is not caught in the client.on('error',..) method. As I found out in the in the mqttjs package it is already implemented that, the following TLS errors are emit the error event:

  • ECONNREFUSED
  • ECONNRESET
  • EADDRINUSE
  • ENOTFOUND

Could you please also implement this in this package too?

Thanks,

client.off() doesn't exit on MqttClient when connectAsync()

When you try execute connectAsync(), the removePromiseResolutionListeners() function try to execute:

client.off(eventName, promiseResolutionListeners[eventName]);

And this client.off function doesn't exists on the MqttClient.propotype

So this connectAsync function crash.

Shouldn't it be removeListener instead of off?

Documentation : three more examples ?

Async is extending

    publish *
    subscribe
    unsubscribe
    end

But only publish seems to have an example in the readme. Would it be possible to show examples of the other 3 as well? It's not so easy to immediately jump on the wagon first time...

connectAsync blocking, promise not rejecting when mqtt server is not running

Hi,

The following code below permanently blocks when using connectAsync and the MQTT server is not running.

The catch handler is not being triggered. How do I correctly use connectAsync so that after a connect timeout period an error is caught and notified to the user??

I am calling connectAsync with the following arguments:

  • url: mqtts://user:[email protected]
  • options: {
    ca: /path/to/ca/file
    rejectUnauthorised: true
    }
    /**
     * Publish a message to the mqtt broker for a given topic
     * Disposes the client connection upon send success or failure.
     * @param {string} topic - topic to publish to 
     * @param {Object} message - message to publish
     */
    async publish(topic, message) {

        let client = undefined;

        try {
            console.log('MQTTClient awaiting connection to broker for publishing...');
            client = await mqtt.connectAsync(this.#url, this.#options);

	    console.log('MQTTClient acquired connection, publishing...');
            await client.publish(topic, message, this.#options);
            console.log(`Published message to topic := ${topic}`);
        } catch(e) {
            console.error(`Failed to publish message to topic := ${topic}`);
            console.error(`message := ${e.message}`);
            console.error(`stack := ${e.stack}`);
        }
        finally {
            console.log('MQTTClient closing connection');
            if(client)
                await client.end();
        }
    }

Edit: Ahhh just looked at the source code. If I include an additional argument allowRetries to false then the catch handler triggers. So, if the allowRetries argument is not included then the default behaviour is that the client will continuously try to connect and code will block permanently until it is reconnected? Is my understanding correct?

Is there any option to specify the number of retries before a connection is deemed to have failed?

Await/Promise for receiving next message feature discussion

This is the 2nd feature I would like to discuss adding to the library.

Which is also a feature request for an excellent raw websockets library.

The feature is described here

Basically, it allows promises to be used to await the next message received, rather than just relying on a callback.

Example from linked issue:

await ws.open();

while (true) {
  data = await ws.recv();
  ws.send(data);
}

See discussion in raw websockets project for more details: here

I make try to implement this, but I am not a fantastic JS, nor promises programmer, but if someone else
wants to help or try to do this, please let me know!

Thanks for this awesome async/MQTT library!

Promise based RPC feature discussion

I am really excited about this library, and there is one or two features I would like to discuss with the maintainers.

The main feature is JSON based RPC, this allows request/response type messaging to be used with promises.

This isn't my idea, it comes from here:
RPC for async Websockets library
(with thanks to Vitalets for such a brilliant idea!)

JSON RPC Feature found in

websocket-as-promised provides simple request-response mechanism (JSON RPC).
Method .sendRequest() sends message with unique requestId and returns promise.
That promise get resolved when response message with the same requestId comes.
For reading/setting requestId from/to message there are two functions defined in options attachRequestId / extractRequestId:

const wsp = new WebSocketAsPromised(wsUrl, {
  packMessage: data => JSON.stringify(data),
  unpackMessage: data => JSON.parse(data),
  attachRequestId: (data, requestId) => Object.assign({id: requestId}, data), // attach requestId to message as `id` field
  extractRequestId: data => data && data.id,                                  // read requestId from message `id` field
});

wsp.open()
 .then(() => wsp.sendRequest({foo: 'bar'})) // actually sends {foo: 'bar', id: 'xxx'}, because `attachRequestId` defined above
 .then(response => console.log(response));  // waits server message with corresponding requestId: {id: 'xxx', ...}

By default requestId value is auto-generated, but you can set it manually:

wsp.sendRequest({foo: 'bar'}, {requestId: 42});

I will take a stab at implementing this, but I would also invite more experienced JS developers to speak up if they would like to implement it!

MaxListenersExceededWarning: Possible EventEmitter memory leak detected.

I started to see in logs following warning, but it's unclear what mat cause it. Do you have any ideas?

        at Writable.writable._write (/home/node/app/packer/node_modules/mqtt/lib/client.js:302:5)
        2019-12-27 20:34:20
        at work (/home/node/app/packer/node_modules/mqtt/lib/client.js:292:12)
        2019-12-27 20:34:20
        at MqttClient._handlePacket (/home/node/app/packer/node_modules/mqtt/lib/client.js:336:12)
        2019-12-27 20:34:20
        at MqttClient._handlePublish (/home/node/app/packer/node_modules/mqtt/lib/client.js:987:12)
        2019-12-27 20:34:20
        at MqttClient.emit (events.js:189:13)
        2019-12-27 20:34:20
        at MqttClient.onMessage (/home/node/app/packer/pipes/eventsToRecords.js:68:16)
        2019-12-27 20:34:20
        at AsyncClient.publish (/home/node/app/packer/node_modules/async-mqtt/index.js:25:12)
        2019-12-27 20:34:20
        at new Promise (<anonymous>)
        2019-12-27 20:34:20
        at Promise (/home/node/app/packer/node_modules/async-mqtt/index.js:26:31)
        2019-12-27 20:34:20
        at MqttClient.publish (/home/node/app/packer/node_modules/mqtt/lib/client.js:431:12)
        2019-12-27 20:34:20
        at MqttClient._sendPacket (/home/node/app/packer/node_modules/mqtt/lib/client.js:841:7)
        2019-12-27 20:34:20
        at sendPacket (/home/node/app/packer/node_modules/mqtt/lib/client.js:40:19)
        2019-12-27 20:34:20
        at TLSSocket.once (events.js:292:8)
        2019-12-27 20:34:20
        at TLSSocket.Readable.on (_stream_readable.js:822:35)
        2019-12-27 20:34:20
        at TLSSocket.addListener (events.js:263:10)
        2019-12-27 20:34:20
        at _addListener (events.js:247:17)
        2019-12-27 20:34:20
        (node:27) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 1001 drain listeners added. Use emitter.setMaxListeners() to increase limit

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.