moscajs / aedes-persistence Goto Github PK
View Code? Open in Web Editor NEWIn-memory implementation of an Aedes persistence, with abstract tests
License: MIT License
In-memory implementation of an Aedes persistence, with abstract tests
License: MIT License
Substance to reproduce the issue
'use strict'
var redis = require('mqemitter-redis')
var mq = redis({
port: 6379,
host: '127.0.0.1'
})
var persistence = require('aedes-persistence-redis')({
port: 6379, // Redis port
host: '127.0.0.1' // Redis host
})
var aedes = require('aedes')({
//mq: mq,
//persistence: persistence,
heartbeatInterval: 1000
})
var server = require('net').createServer(aedes.handle)
var port = 1886
server.listen(port, function() {
console.log('server listening on port', port)
})
aedes.on('clientError', function(client, err) {
console.log('client error', client.id, err.message, err.stack)
})
aedes.on('client', function(client) {
console.log('new client', client.id)
})
var mqtt = require('mqtt')
var client = mqtt.connect({
host: 'localhost',
port: 1886,
clientId: 'client1',
clean: false
})
client.on('connect', function() {
client.subscribe('test', { qos: 1 })
})
client.on('message', function(topic, message) {
// message is Buffer
console.log(message.toString())
//client.end()
})
var mqtt = require('mqtt')
var client = mqtt.connect({
host: 'localhost',
port: 1886,
clientId: 'client2',
clean: false
})
client.on('connect', function() {
for (var i = 0; i < 10; i++) {
client.publish('test', 'Hello mqtt ' + i, { qos: 1 })
console.log('sent')
}
})
client.on('message', function(topic, message) {
console.log(message.toString())
})
Whenever the subscriber reconnects, a duplicate subscription is added to Qlobber. So I'm getting a lot of enqueues since the https://github.com/mcollina/aedes/blob/master/aedes.js#L159 persistence function gets duplicate subs. Not sure if I'm doing something wrong here.
In the README, for addSubscriptions, there is a note saying Any subscriptions with qos: 0 will be ignored.
. Why? It is legal to subscribe to a topic with a QoS of 0.
we need to retrieve the last message for a given tag, this occurs for many tags and somethings redis starts raising error on connections [ioredis] Unhandled error event: Error: connect ECONNREFUSED
. The following is the current implementation, is there a better way to get only the last message ?
var packets = [];
var stream = broker.persistence.createRetainedStreamCombi ([tag]);
stream.on('end', function(){
if (packets.length !== 0) {
var retained_value = JSON.parse(packets[0].payload).value;
// Compare retained_value
}
});
stream.pipe(through.obj(function sendRetained (packet, enc, cb_through) {
packets.push(packet);
cb_through();
}));
Hi Mcollina,
In test case 'stream all will messages' streamWill() function is used which returns 'will' messages. However, the next text case 'stream all will message for unknown brokers' also used streamWill() function which returns 'will' messages of first test case as well causing test case to fail.
Will it be good if I use delWill() function after completion of former test case. For example:
testInstance('stream all will messages', function (t, instance) { t.plan(3) var client = { id: '12345' } var toWrite = { topic: 'hello/died', payload: new Buffer('muahahha'), qos: 0, retain: true } instance.putWill(client, toWrite, function (err, c) { t.error(err, 'no error') t.equal(c, client, 'client matches') instance.streamWill().pipe(through.obj(function (chunk, enc, cb) { t.deepEqual(chunk, { clientId: client.id, brokerId: instance.broker.id, topic: 'hello/died', payload: new Buffer('muahahha'), qos: 0, retain: true }, 'packet matches') instance.delWill(client, function(err, result, client) { //This line instance.destroy(t.end.bind(t)) }) })) }) })
It seems we don't have a mechanism to limit the maximum inflight messages in memory. Is it?
I tried to use this library. I create client 1 and client 2 subscribe {QOS:2} on the same topic
When the client 1 publish a message {QOS1} and disconnect,
sometime client 2 will not be able to receive the message.
Even from the broker logs, I can see client 1 has successfully published a message.
What problem does this solve? The answer might seem obvious to many, but not to all. I have my theories and I implement this either way, but it would be helpful to implementors to know why they need this. It might even help turn those implementors into innovators a little faster.
Is this really OK to match overlapping topics for a client https://github.com/mcollina/aedes-persistence/blob/master/abstract.js#L233 @mcollina ?
I would at first expect subscriptionsByTopic
to match only by a single record!
This is what currently QlobberDedup is doing... @davedoesdev
Hi,
here is my server code:
const UserService = require('./service/UserService');
global.httpContext = require('express-http-context');
const uuidv4 = require('uuid/v4');
require('./utils/logger');
const aedesPersistenceMongoDB = require('aedes-persistence-mongodb');
const aPMDB = new aedesPersistenceMongoDB({
url: process.env.MQTT_AEDES_DB_URIS, //'mongodb://127.0.0.1/aedes-test', // Optional when you pass db object
// Optional ttl settings
ttl: {
packets: 300, // Number of seconds
subscriptions: 300
}
});
let mqmongo = require('mqemitter-mongodb');
let emitter = mqmongo({
url: process.env.MQTT_AEDES_DB_URIS
});
var aedes = require('aedes')({
persistence: aPMDB,
mq: emitter
});
var server = require('net').createServer(aedes.handle);
var port = 1883;
server.listen(port, function() {
console.log('server listening on port', port);
});
aedes.on('published', function(packet, client) {
console.log('Published', packet);
console.log('Client', client);
});
// fired when a client connects
aedes.on('clientConnected', function(client) {
console.log('Client Connected:', client);
});
aedes.on('client', function(client) {
console.log('on Client :', client.id, client.broker.id);
});
// fired when a client disconnects
aedes.on('clientDisconnected', function(client) {
console.log('Client Disconnected:', client.id);
});
aedes.on('clientError', function(client) {
console.log('Client Disconnected:', client.id);
});
aedes.on('clientError', function(client, err) {
console.log('client error', client.id, err.message, err.stack);
});
aedes.on('connectionError', function(client, err) {
console.log('client error', client, err.message, err.stack);
});
aedes.on('subscribe', function(subscriptions, client) {
if (client) {
console.log('subscribe from client', subscriptions, client.id);
}
});
// Accepts the connection if the username and password are valid
var authenticate = async function(client, username, password, callback) {
var error = new Error('Auth error');
error.returnCode = 1;
const response = await UserService.verifyToken(password.toString());
if (response.succeed == true) {
// console.log(result);
if (response.result.roles.includes('Partner') == true) {
client.user = response.result.partyId;
callback(null, true);
} else {
callback(error, null);
}
} else {
callback(error, null);
}
};
// In this case the client authorized as alice can publish to /users/alice taking
// the username from the topic and verifing it is the same of the authorized user
var authorizePublish = function(client, packet, callback) {
var error = new Error('Auth error');
error.returnCode = 1;
return callback(error, null);
};
// In this case the client authorized as alice can subscribe to /users/alice taking
// the username from the topic and verifing it is the same of the authorized user
var authorizeSubscribe = function(client, sub, callback) {
if (sub.topic.split('/')[1] != client.user) {
return callback(new Error('wrong topic'));
}
callback(null, sub);
};
aedes.authenticate = authenticate;
aedes.authorizePublish = authorizePublish;
aedes.authorizeSubscribe = authorizeSubscribe;
aedes.publish({
cmd: 'publish',
qos: 2,
topic: 'private/1000127',
payload: new Buffer.from('hello daniel 123456!'),
retain: true
});
and this is my client code:
url = require('url');
// Parse
var mqtt_url = url.parse(
process.env.CLOUDAMQP_MQTT_URL ||
'mqtt://1000127:eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjUyNywicGFydHlJZCI6MTAwMDEyNywiaWF0IjoxNTcwODc0NjE5LCJleHAiOjE1NzM0NjY2MTl9.IhvRESrN_oH5fRyGusrC-VVWlUvLil1FmQ7If3-pDMU@192.168.3.110:1883'
);
var auth = (mqtt_url.auth || ':').split(':');
var url = 'mqtt://' + mqtt_url.host;
//username: auth[0] + ":" + auth[0] if you are on a shared instance
var options = {
port: mqtt_url.port,
clientId: 'mqttjs_2',
username: auth[0],
password: auth[1],
retain: true,
clean: false
};
console.log('Starting!!!!');
// Create a client connection
var client = mqtt.connect(url, options);
client.on('connect', function() {
// When connected
console.log('connected!!');
});
client.subscribe('private/1000127', { qos: 2 }, function(err) {
if (!err) {
// when a message arrives, do something with it
console.log('subscribed!!');
} else {
console.log('error on subscribe', err);
client.end();
}
});
client.on('message', function(topic, message, packet) {
console.log("Received '" + message + "' on '" + topic + "'");
});
client.on('reconnect', () => {
console.log('Client reconnected');
});
client.on('message', function(topic, message, packet) {
console.log("Received '" + message + "' on '" + topic + "'");
});
client.on('disconnect', result => {
console.log('disconnect', result);
});
client.on('error', function(error) {
console.log("Can't connect" + error);
});
client.on('suback', function(error) {
console.log("Can't suback" + error);
});
Expected behavior: client should receive message only once after connection
Hi
I'm looking for a way to know if a certain client is currently connected. (online)
should I use the persistence layer or the client connected / disconnected callbacks
of the broker to keep a list of online clients?
or just access this variable ?:
https://github.com/mcollina/aedes/blob/master/aedes.js#L261
ps. this is a starting point for not allowing
the same client ids from connecting to the cluster from different
data centers.
The following method exists but is not documented in the README.
aedes-persistence/persistence.js
Line 350 in bf8691f
Here, it currently counts the number of offline subscriptions.
However, aedes-persistence-redis counts the number of unique topic names in offline subscriptions.
Which is more useful?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.