Git Product home page Git Product logo

aedes-persistence's People

Contributors

behrad avatar davedoesdev avatar dependabot-preview[bot] avatar gavindmello avatar getlarge avatar glentiki avatar gnought avatar lgtm-com[bot] avatar linusu avatar mcollina avatar oldrich-s avatar robertslando avatar seriousme avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aedes-persistence's Issues

Duplicate subscriptions issue

Substance to reproduce the issue

Server code :

'use strict'

var redis = require('mqemitter-redis')

var mq = redis({
  port: 6379,
  host: '127.0.0.1'
})

var persistence = require('aedes-persistence-redis')({
    port: 6379, // Redis port
    host: '127.0.0.1' // Redis host
})
var aedes = require('aedes')({
    //mq: mq,
    //persistence: persistence,
    heartbeatInterval: 1000
})
var server = require('net').createServer(aedes.handle)

var port = 1886

server.listen(port, function() {
    console.log('server listening on port', port)
})

aedes.on('clientError', function(client, err) {
    console.log('client error', client.id, err.message, err.stack)
})

aedes.on('client', function(client) {
    console.log('new client', client.id)
})

Subscriber :

var mqtt = require('mqtt')
var client = mqtt.connect({
    host: 'localhost',
    port: 1886,
    clientId: 'client1',
    clean: false
})

client.on('connect', function() {
    client.subscribe('test', { qos: 1 })
})

client.on('message', function(topic, message) {
    // message is Buffer
    console.log(message.toString())
        //client.end()
})

Publisher :

var mqtt = require('mqtt')
var client = mqtt.connect({
    host: 'localhost',
    port: 1886,
    clientId: 'client2',
    clean: false
})

client.on('connect', function() {
    for (var i = 0; i < 10; i++) {
        client.publish('test', 'Hello mqtt ' + i, { qos: 1 })
        console.log('sent')
    }
})

client.on('message', function(topic, message) {
    console.log(message.toString())
})

Whenever the subscriber reconnects, a duplicate subscription is added to Qlobber. So I'm getting a lot of enqueues since the https://github.com/mcollina/aedes/blob/master/aedes.js#L159 persistence function gets duplicate subs. Not sure if I'm doing something wrong here.

Retrieve last message for a given tag

we need to retrieve the last message for a given tag, this occurs for many tags and somethings redis starts raising error on connections [ioredis] Unhandled error event: Error: connect ECONNREFUSED . The following is the current implementation, is there a better way to get only the last message ?

        var packets = [];
        var stream = broker.persistence.createRetainedStreamCombi ([tag]);

        stream.on('end', function(){
            if (packets.length !== 0) {
                var retained_value = JSON.parse(packets[0].payload).value;
                // Compare retained_value 
            }
        });
        stream.pipe(through.obj(function sendRetained (packet, enc, cb_through) {
            packets.push(packet);
            cb_through();
        }));

Test case :streamWill issue

Hi Mcollina,

In test case 'stream all will messages' streamWill() function is used which returns 'will' messages. However, the next text case 'stream all will message for unknown brokers' also used streamWill() function which returns 'will' messages of first test case as well causing test case to fail.

Will it be good if I use delWill() function after completion of former test case. For example:
testInstance('stream all will messages', function (t, instance) { t.plan(3) var client = { id: '12345' } var toWrite = { topic: 'hello/died', payload: new Buffer('muahahha'), qos: 0, retain: true } instance.putWill(client, toWrite, function (err, c) { t.error(err, 'no error') t.equal(c, client, 'client matches') instance.streamWill().pipe(through.obj(function (chunk, enc, cb) { t.deepEqual(chunk, { clientId: client.id, brokerId: instance.broker.id, topic: 'hello/died', payload: new Buffer('muahahha'), qos: 0, retain: true }, 'packet matches') instance.delWill(client, function(err, result, client) { //This line instance.destroy(t.end.bind(t)) }) })) }) })

Missing message from Broker to subscribed Client

I tried to use this library. I create client 1 and client 2 subscribe {QOS:2} on the same topic
When the client 1 publish a message {QOS1} and disconnect,
sometime client 2 will not be able to receive the message.
Even from the broker logs, I can see client 1 has successfully published a message.

Readme needs a description of WHY we need this.

What problem does this solve? The answer might seem obvious to many, but not to all. I have my theories and I implement this either way, but it would be helpful to implementors to know why they need this. It might even help turn those implementors into innovators a little faster.

Recieve retaied messages after each connect

Hi,
here is my server code:

 const UserService = require('./service/UserService');
 global.httpContext = require('express-http-context');
 const uuidv4 = require('uuid/v4');
 require('./utils/logger');
 const aedesPersistenceMongoDB = require('aedes-persistence-mongodb');
const aPMDB = new aedesPersistenceMongoDB({
	url: process.env.MQTT_AEDES_DB_URIS, //'mongodb://127.0.0.1/aedes-test', // Optional when you pass db object
	// Optional ttl settings
	ttl: {
		packets: 300, // Number of seconds
		subscriptions: 300
	}
});
let mqmongo = require('mqemitter-mongodb');
let emitter = mqmongo({
	url: process.env.MQTT_AEDES_DB_URIS
});
var aedes = require('aedes')({
	persistence: aPMDB,
	mq: emitter
});
var server = require('net').createServer(aedes.handle);
var port = 1883;
server.listen(port, function() {
	console.log('server listening on port', port);
});
aedes.on('published', function(packet, client) {
	console.log('Published', packet);
	console.log('Client', client);
});
// fired when a client connects
aedes.on('clientConnected', function(client) {
	console.log('Client Connected:', client);
});
aedes.on('client', function(client) {
	console.log('on Client :', client.id, client.broker.id);
});
// fired when a client disconnects
aedes.on('clientDisconnected', function(client) {
	console.log('Client Disconnected:', client.id);
});
aedes.on('clientError', function(client) {
	console.log('Client Disconnected:', client.id);
});
aedes.on('clientError', function(client, err) {
	console.log('client error', client.id, err.message, err.stack);
});
aedes.on('connectionError', function(client, err) {
	console.log('client error', client, err.message, err.stack);
});
aedes.on('subscribe', function(subscriptions, client) {
	if (client) {
		console.log('subscribe from client', subscriptions, client.id);
	}
});
// Accepts the connection if the username and password are valid
var authenticate = async function(client, username, password, callback) {
	var error = new Error('Auth error');
	error.returnCode = 1;
	const response = await UserService.verifyToken(password.toString());
	if (response.succeed == true) {
		// console.log(result);
		if (response.result.roles.includes('Partner') == true) {
			client.user = response.result.partyId;
			callback(null, true);
		} else {
			callback(error, null);
		}
	} else {
		callback(error, null);
	}
};
// In this case the client authorized as alice can publish to /users/alice taking
// the username from the topic and verifing it is the same of the authorized user
var authorizePublish = function(client, packet, callback) {
	var error = new Error('Auth error');
	error.returnCode = 1;
	return callback(error, null);
};
// In this case the client authorized as alice can subscribe to /users/alice taking
// the username from the topic and verifing it is the same of the authorized user
var authorizeSubscribe = function(client, sub, callback) {
	if (sub.topic.split('/')[1] != client.user) {
		return callback(new Error('wrong topic'));
	}
	callback(null, sub);
};
aedes.authenticate = authenticate;
aedes.authorizePublish = authorizePublish;
aedes.authorizeSubscribe = authorizeSubscribe;
aedes.publish({
	cmd: 'publish',
	qos: 2,
	topic: 'private/1000127',
	payload: new Buffer.from('hello daniel 123456!'),
	retain: true
});

and this is my client code:

  url = require('url');
// Parse
var mqtt_url = url.parse(
  process.env.CLOUDAMQP_MQTT_URL ||
    'mqtt://1000127:eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjUyNywicGFydHlJZCI6MTAwMDEyNywiaWF0IjoxNTcwODc0NjE5LCJleHAiOjE1NzM0NjY2MTl9.IhvRESrN_oH5fRyGusrC-VVWlUvLil1FmQ7If3-pDMU@192.168.3.110:1883'
);
var auth = (mqtt_url.auth || ':').split(':');
var url = 'mqtt://' + mqtt_url.host;

//username: auth[0] + ":" + auth[0] if you are on a shared instance
var options = {
  port: mqtt_url.port,
  clientId: 'mqttjs_2',
  username: auth[0],
  password: auth[1],
  retain: true,
  clean: false
};
console.log('Starting!!!!');

// Create a client connection
var client = mqtt.connect(url, options);

client.on('connect', function() {
  // When connected
  console.log('connected!!');
});
client.subscribe('private/1000127', { qos: 2 }, function(err) {
  if (!err) {
    // when a message arrives, do something with it
    console.log('subscribed!!');
  } else {
    console.log('error on subscribe', err);
    client.end();
  }
});
client.on('message', function(topic, message, packet) {
  console.log("Received '" + message + "' on '" + topic + "'");
});
client.on('reconnect', () => {
  console.log('Client reconnected');
});
 client.on('message', function(topic, message, packet) {
 	console.log("Received '" + message + "' on '" + topic + "'");
 });
client.on('disconnect', result => {
  console.log('disconnect', result);
});

client.on('error', function(error) {
  console.log("Can't connect" + error);
});
client.on('suback', function(error) {
  console.log("Can't suback" + error);
});

Expected behavior: client should receive message only once after connection

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.