Git Product home page Git Product logo

mqtt-router's Introduction

npm (scoped) License: LGPL v3

mqtt-router

An easy-to-use and flexible routing library for MQTT.

Overview

@pera-swarm/mqtt-router is a library for handling MQTT publish/subscribe capabilities with a straight forward routing architecture.

This is a Node.js library available on both npm registry and GitHub package registry.

Usage

1. Installation

Installation done using npm install command:

npm i --save @pera-swarm/mqtt-router

Also, you need to install mqtt library as well.

npm i --save mqtt

2. Set up Routes

Create routes for subscribe and publish:

routes.js
// Sample dynamic route list with handler functions
const SAMPLE_ROUTES = [
    {
        topic: 'v1/sample',
        allowRetained: true,
        subscribe: true,
        publish: false,
        handler: (msg) => {
            const data = JSON.parse(msg);
            console.log('Sample subscription picked up the topic', data);
        }
    }
];

module.exports = SAMPLE_ROUTES;

3. Start the Router

You should configure your own mqttOptions according to your mqtt broker and application settings.

index.js
// Configure mqttClient
const mqttClient = require('mqtt');
const mqttOptions = {
    port: 1883,
    clientId: process.env.MQTT_CLIENT,
    username: process.env.MQTT_USER || '',
    password: process.env.MQTT_PASS || ''
};
const mqtt = mqttClient.connect(process.env.MQTT_HOST, mqttOptions);

// Import MQTTRouter from mqtt-router
const { MQTTRouter } = require('@pera-swarm/mqtt-router');
const routes = require('./routes');

var router;

// Sample MQTT Message Options
const SAMPLE_OPTIONS = { qos: 2, rap: true, rh: true };

// Sample setup function that runs on connect
const SAMPLE_SETUP_FN = () => {
    console.log('sample setup fn');
};

// Sample MQTT Error handler function
const SAMPLE_ON_ERROR_FN = (err) => {
    console.log('error: mqtt');
    console.log(err);
};

router = new MQTTRouter(
    mqtt,
    routes,
    SAMPLE_OPTIONS,
    SAMPLE_SETUP_FN,
    SAMPLE_ON_ERROR_FN
);

router.start();

router.start() will listen to the subscribed routes that are specified as subscribed: true in the route specification and then if the subscriber picked up a message for the associated topic, the MQTTRouter will call the relevant handler funtion.

You can also wrap the routes using wrapper function to include additional higher level attribute to the handler function as well.

index.js
// Import MQTTRouter and wrapper from mqtt-router
const { MQTTRouter, wrapper } = require('@pera-swarm/mqtt-router');
const routes = require('./routes');

var router;

// Sample MQTT Message Options
const SAMPLE_OPTIONS = { qos: 2, rap: true, rh: true };

// Sample setup function that runs on connect
const SAMPLE_SETUP_FN = () => {
    console.log('sample setup fn');
};

// Sample MQTT Error handler function
const SAMPLE_ON_ERROR_FN = (err) => {
    console.log('error: mqtt');
    console.log(err);
};

// Sample higher level attribute for the handler function
const sampleAttrib = {
    time: Date.now()
};

router = new MQTTRouter(
    mqtt,
    wrapper(routes, sampleAttrib),
    SAMPLE_OPTIONS,
    SAMPLE_SETUP_FN,
    SAMPLE_ON_ERROR_FN
);

router.start();
routes.js
// Sample dynamic route list with handler functions.
// sampleAttrib will be added to the handler function as the second parameter.
const SAMPLE_ROUTES = [
    {
        topic: 'v1/sample',
        allowRetained: true,
        subscribe: true,
        publish: false,
        handler: (msg, attrib) => {
            const data = JSON.parse(msg);
            // console.log(attrib);
            console.log('Sample subscription picked up the topic', data);
        }
    }
];

module.exports = SAMPLE_ROUTES;

Useful

Note: You can also configure a topic prefix by configuring an environment variable MQTT_CHANNEL. (example: MQTT_CHANNEL=beta in a .env file locally)

Contribute

1. Install dependencies

Install project dependencies.

npm install

2. Testing

Note: Before running the test cases, you should configure environment variables MQTT_HOST,MQTT_USER, MQTT_PASS, and MQTT_CLIENT. Please refer sample.nodemon.json file for nodemon environment variable configuration.

Manually run the test cases.

node test/index.js

or you can use nodemon script once environment variables configured correctly.

npm run client

Documentation


Route

A route definition for handling route subscription logic. Following are the properties supported on the Route definition:

  • topic: string The Route topic

  • type: 'String' | 'JSON' Payload type (default:String)

  • allowRetained: boolean Retain allowance

  • subscribe: boolean Subscribe flag

  • publish: boolean Publish flag

  • handler: Function The default subscribe handler function, called when subscribe:true, packet.retain:true|false and allowRetained:true. Retained messages and new messages will be handled by default.

  • fallbackRetainHandler?: Function Subscribe handler function only for retained messages, but for route specific custom logic. called when subscribe:true, packet.retain:true and allowRetained:false.

Note: If specified, fallbackRetainHandler function will be called. If not specified, retained messages will be discarded.


MQTTRouter (mqttConnection, routes, options, setup, onError)

The main entrypoint of mqtt-router that defines the router logic. You have to import the MQTTRouter class and instantiate with a mqtt client. Parameters supported in the constructor:

  • mqttConnection {MqttClient} : mqtt connection
  • routes {Route[]} : routes with mqtt topic, handler and allowRetained properties
  • options {IClientSubscribeOptions} : mqtt message options
  • setup {Function} : setup function that runs on successful connection
  • onError {Function} : error handler function

mqttRouter.start()

The method for starting the mqtt router.

You must call this method once, a MQTTRouter object instantiated.


mqttRouter.pushToPublishQueue(topic, data)

Add a message to the publish queue that to be scheduled to publish. Parameters supported:

  • topic {string} : message topic
  • data {string|Buffer} : message data

mqttRouter.addRoute(route)

Add a route to the subscriber routes list. Parameter supported:

  • route {Route} : route object to be added to the subscriber list

mqttRouter.removeRoute(topic)

Remove a route from the subscriber routes list. Parameter supported:

  • topic {string} : route topic

wrapper(routes, property)

Wrap an array of Route objects with a higher order property (ex: property can bethis from the callee class) or a separate attribute to the handler function as a second parameter, in each route object. Parameters supported:

  • routes {Route[]} : routes array
  • property {any} : property that required to be wrapped with

Queue

A Queue implementation for the mqtt-router with a scheduler that acts as a "Publish Queue". Parameters supported in the constructor:

  • mqttClient {MqttClient} : mqtt connection
  • options {IClientSubscribeOptions} : mqtt message options
  • number {number} : interval

queue.begin()

Begin the queue processing (scheduler).

Note: You must call this method once, a Queue object instantiated.


queue.add(topic, data)

Add a message to the publish queue. This message will be published by the scheduler. Parameters supported:

  • topic {string} : message topic
  • data {string|Buffer} : message data

queue.remove(topic)

Remove a message in the queue by a given topic. Parameter supported:

  • topic {string} : message topic

queue.findByTopic(topic)

Find a message with the given topic in the queue. Returns -1 if not found on the queue. Parameter supported:

  • topic {string} : message topic

subscribeToTopic(mqtt, topic, options)

Subscribe to a given topic with options. Parameters supported:

  • mqtt {MqttClient} : mqtt connection
  • topic {string} : message topic
  • options {IClientSubscribeOptions} : mqtt message options

publishToTopic(mqtt, topic, message, options, callback)

Publish a message to a given message topic with options and a callback function. Parameters supported:

  • mqtt {MqttClient} : mqtt connection
  • topic {string} : message topic
  • message {string} : message data
  • options {IClientSubscribeOptions} : mqtt message options
  • callback {Function} : callback function

secondsInterval(interval)

Generates a cron interval called in given seconds Parameter supported:

  • interval {number} : interval in seconds

minutesInterval(interval)

Generates a cron interval called in given minutes Parameter supported:

  • interval {number} : interval in minutes

To-Do

  • Fix duplicate topic support for routing.

Licence

This project is licensed under LGPL-2.1 Licence.

mqtt-router's People

Contributors

luk3sky avatar nuwanj avatar

Stargazers

 avatar  avatar

Watchers

 avatar

mqtt-router's Issues

Flexible routing

Summary

We need to implement the functionality to unsubscribe from certain routes declared in the routes array and add other subscriptions and routes after starting the router. Also we need to consider the publishers as well.

Limit one MQTT Router instance for one MQTT Channel

Summary

Need to implement a locking mechanism for registering only one router instance for a particular MQTT Channel. Find more information on the protocol documentation.

Checklist:

  • /service/discovery route handler function
  • /service/terminate/{UUID} route handler function
  • Swarm server time-out and exit event upon terminate request

Add multiple routes at once

Following should be added to router.ts & update the documentation.

    /**
     * method for adding multiple routes to the list
     * @param {Route[]} route[] list of route objects to be added to the subscriber list
     */
    addRoutes = (routes: Route[]) => {
        for (let i = 0; i < routes.length; i++) {
            //console.log(routes[i].topic);
            this.addRoute(routes[i]);
        }
    };

Fault tolerancy

Summary

Need to idiot proof the library. Don't want to crash the entire applications for TypeErrors. Certain Errors would be okay to crash the whole application.

MQTT Publish Queue options support

Need to have support for options for each message in mqtt-router publish queue.

Like this:
mqttRouter.pushToPublishQueue(topic, message, options)

MQTT Publish Queue

Requirements

  • Need to implement a queue for message publication.
  • MQTT class (MQTT router + this implementation) should frequently check the queue and publish the message in the queue.

Separate handler logics based on the retain allowance

Summary

We need to implement a way to handle the logics for both allowedRetain: true and allowedRetain: false as separately. We may also need to configure the route handler function in this implementation as well.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.