Git Product home page Git Product logo

aedes's Introduction

Aedes

ci js-standard-style Maintenance PRs Welcome
Coverage Status Known Vulnerabilities
node NPM version NPM downloads

opencollective

Barebone MQTT server that can run on any stream servers

Install

To install aedes, simply use npm:

npm install aedes

Docker

Check Docker docs here

API

Features

  • Full compatible with MQTT 3.1 and 3.1.1
  • Standard TCP Support
  • SSL / TLS
  • WebSocket Support
  • Message Persistence
  • Automatic Reconnect
  • Offline Buffering
  • Backpress-support API
  • High Availability
  • Clusterable
  • Authentication and Authorization
  • $SYS support
  • Pluggable middlewares
  • Dynamic Topics Support
  • MQTT Bridge Support between aedes
  • MQTT 5.0 (not support yet)
  • Bridge Protocol (incoming connections only)

Examples

Clusters

Aedes needs on disk dbs like MongoDB and Redis in order to work with clusters. Based on our tests and users reports the best performances/stability are reached when using aedes-persistence-mongodb paired with mqemitter-redis.

Other info:

  • The repo aedes-tests is used to test aedes with clusters and different emitters/persistences. Check its source code to have a starting point on how to work with clusters

Bridge connections

Normally, when publishing a message, the retain flag is consumed by Aedes and then set to false. This is done for two reasons:

  • MQTT-3.3.1-9 states that it MUST set the RETAIN flag to 0 when a PUBLISH Packet is sent to a Client because it matches an established subscription regardless of how the flag was set in the message it received.
  • When operating as a cluster, only one Aedes node may store the packet

Brokers that support the Bridge Protocol can connect to Aedes. When connecting with this special protocol, subscriptions work as usual except that the retain flag in the packet is propagated as-is.

Extensions

Middleware Plugins

Persistence

MQEmitter

Acknowledgements

This library is born after a lot of discussion with all Mosca users and how that was deployed in production. This addresses your concerns about performance and stability.

Mosca vs Aedes

Example benchmark test with 1000 clients sending 5000 QoS 1 messsages. Used mqtt-benchmark with command:

mqtt-benchmark --broker tcp://localhost:1883 --clients 1000 --qos 1 --count 5000

CPU INFO:

Architecture:        x86_64
CPU op-mode(s):      32-bit, 64-bit
Byte Order:          Little Endian
CPU(s):              8
On-line CPU(s) list: 0-7
Thread(s) per core:  2
Core(s) per socket:  4
Socket(s):           1
NUMA node(s):        1
Vendor ID:           GenuineIntel
CPU family:          6
Model:               94
Model name:          Intel(R) Core(TM) i7-6700HQ CPU @ 2.60GHz
Stepping:            3
CPU MHz:             800.014
CPU max MHz:         3500,0000
CPU min MHz:         800,0000
BogoMIPS:            5199.98
Virtualization:      VT-x
L1d cache:           32K
L1i cache:           32K
L2 cache:            256K
L3 cache:            6144K

Benchmark: Aedes

In memory - No clusters

========= TOTAL (1000) =========
Total Ratio:                 1.000 (5000000/5000000)
Total Runtime (sec):         178.495
Average Runtime (sec):       177.845
Msg time min (ms):           0.077
Msg time max (ms):           199.805
Msg time mean mean (ms):     35.403
Msg time mean std (ms):      0.042
Average Bandwidth (msg/sec): 28.115
Total Bandwidth (msg/sec):   28114.678

Redis Persistence and Redis Emitter - With Clusters

========= TOTAL (1000) =========
Total Ratio:                 1.000 (5000000/5000000)
Total Runtime (sec):         114.404
Average Runtime (sec):       109.022
Msg time min (ms):           0.065
Msg time max (ms):           393.214
Msg time mean mean (ms):     21.520
Msg time mean std (ms):      0.595
Average Bandwidth (msg/sec): 45.896
Total Bandwidth (msg/sec):   45896.306

Mongo Persistence and Redis Emitter - With Clusters

========= TOTAL (1000) =========
Total Ratio:                 1.000 (5000000/5000000)
Total Runtime (sec):         112.769
Average Runtime (sec):       105.524
Msg time min (ms):           0.062
Msg time max (ms):           329.062
Msg time mean mean (ms):     20.750
Msg time mean std (ms):      0.878
Average Bandwidth (msg/sec): 47.464
Total Bandwidth (msg/sec):   47464.271

Redis Persistence and Mongodb Emitter - With Clusters

========= TOTAL (1000) =========
Total Ratio:                 1.000 (5000000/5000000)
Total Runtime (sec):         118.587
Average Runtime (sec):       114.190
Msg time min (ms):           0.080
Msg time max (ms):           324.028
Msg time mean mean (ms):     22.558
Msg time mean std (ms):      0.730
Average Bandwidth (msg/sec): 43.832
Total Bandwidth (msg/sec):   43831.927

Benchmark: Mosca

========= TOTAL (1000) =========
Total Ratio:                 1.000 (5000000/5000000)
Total Runtime (sec):         264.934
Average Runtime (sec):       264.190
Msg time min (ms):           0.070
Msg time max (ms):           168.116
Msg time mean mean (ms):     52.629
Msg time mean std (ms):      0.074
Average Bandwidth (msg/sec): 18.926
Total Bandwidth (msg/sec):   18925.942

Made with Aedes

Here is a list of some interesting projects that are using Aedes as MQTT Broker. Submit a PR or an issue if you would like to add yours

  • node-red-contrib-aedes: MQTT broker for Node-Red based on Aedes
  • Mqtt2Mqtt: Mqtt Bridge between two brokers with UI
  • Kuzzle: High performance and full featured IoT backend using MQTT alongside WebSocket and Http protocols

Collaborators

Contribution

Help wanted Contributors

Want to contribute? Check our list of features/bugs

Security notice

Messages sent to the broker are considered valid once they pass the authorizePublish callback. In other terms, if permissions for the given client are revoked after the call completes, the message is still considered valid. In case you are sending time-sensitive messages, make sure to use QoS 0 or connect with a clean session.

Support

If there are bugs/leaks in production scenarios, we encourage people to send Pull Request and/or reach out maintainers for some paid support.

Backers

Thank you to all our backers! 🙌

Backers

Sponsors

Become a sponsor to get your logo on our README on Github

Sponsor

License

Licensed under MIT.

aedes's People

Contributors

barlock avatar behrad avatar cordovapolymer avatar dependabot-preview[bot] avatar dependabot[bot] avatar dpatekar avatar eladnava avatar emilianobonassi avatar gavindmello avatar getlarge avatar gianluca-casagrande-stiga avatar gnought avatar jdiamond avatar jyotman avatar mcollina avatar nguyenthenguyen avatar oldrich-s avatar ovhemert avatar pablomaribondo avatar phil-mitchell avatar pkinney avatar platy avatar r3na avatar rafsawicki avatar renatoc avatar robertslando avatar saboya avatar seeluck avatar seriousme avatar snyk-bot avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aedes's Issues

client parameter is null on publish event

Like the title says, when implementing the publish event like this:

aedes.on('publish', function(packet, client) {
  console.log('client = ' + client);
});

The console will always be null. The little documentation that's in place tends to lean towards that client should be populated except for internal messages. Is this a wrong assertion? If so, in what cases should the publish event return a client?

Websockets

Hi,

I am trying to use aedes with websockets like this:

import ws from 'websocket-stream'
const aedes = require('aedes')()
const server = ws.createServer({port: 9001}, aedes.handle)

in the browser I implemented a basic client with Paho, that can publish and subscribe to messages from the same topic.

However, when I send MQTT messages to Aedes this is what is coming back from the broker:

After the first message, I got back a 2 bytes long payload: 0 followed by the first character of the original message sent.

After that there is no more incoming messages, regardless of how many times I send a message to the broker.

BTW, the client works great with no modification when I switch to mosquitto with websocket support.

Does my Aedes code look OK to you?

Issue with memory persistence, messages getting redelivered

So, I'm using aedes for testing so that we can shift to aedes from mosca. I have two node processes, one serving as the receiver and another one serving as the sender.

I'm sending around 2000 messages to the receiver. All messages are getting delivered successfully. But, if I'm disconnecting the receiver and connecting again, messages are getting delivered again in chunks following which the server crashes with the following error stack.

/Users/debduttochakraborty/Sites/TestCode/proxy-server/node_modules/aedes/lib/handlers/subscribe.js:33
  topicActions.call(this, sub, done)
               ^

RangeError: Maximum call stack size exceeded
    at SubscribeState.doSubscribe (/Users/debduttochakraborty/Sites/TestCode/proxy-server/node_modules/aedes/lib/handlers/subscribe.js:33:16)
    at makeCall (/Users/debduttochakraborty/Sites/TestCode/proxy-server/node_modules/aedes/node_modules/fastseries/series.js:113:10)
    at SubscribeState.ResultsHolder.release (/Users/debduttochakraborty/Sites/TestCode/proxy-server/node_modules/aedes/node_modules/fastseries/series.js:94:9)
    at work (/Users/debduttochakraborty/Sites/TestCode/proxy-server/node_modules/aedes/node_modules/fastfall/fall.js:121:25)
    at sendRetained (/Users/debduttochakraborty/Sites/TestCode/proxy-server/node_modules/aedes/lib/handlers/subscribe.js:95:5)
    at SubscribeState.subTopic (/Users/debduttochakraborty/Sites/TestCode/proxy-server/node_modules/aedes/lib/handlers/subscribe.js:90:5)
    at work (/Users/debduttochakraborty/Sites/TestCode/proxy-server/node_modules/aedes/node_modules/fastfall/fall.js:105:23)
    at Aedes.authorizeSubscribe (/Users/debduttochakraborty/Sites/TestCode/proxy-server/node_modules/aedes/aedes.js:281:3)
    at SubscribeState.authorize (/Users/debduttochakraborty/Sites/TestCode/proxy-server/node_modules/aedes/lib/handlers/subscribe.js:38:17)
    at Holder.work (/Users/debduttochakraborty/Sites/TestCode/proxy-server/node_modules/aedes/node_modules/fastfall/fall.js:105:23)

Cluster support

There is a roadmap or a due date for the cluster support become available? Is there any simple alternative that I could implement to solve this point?

authorizeSubscribe, cannot restrict client from subscribing

I am trying to restrict subscriptions to specific topics on aedes using the authorizeSubscribe method.

I tried the example code shown in the documentation, however was not able to restrict users from subscribing.

Can you please help me in implementing this?

wbsocket support

Can we get something like attachHttp available in Mosca? WebSocekt should have all the regular options available like full security options - tls cert, client cert etc.

Will this work with clients using websockets just like mosca?

Will this work with clients using websockets just like mosca? i have a mosca server setup right now and it's doing okay but QoS 2 is a nice thing to have but our applications is using websockets to connect to a mqtt broker (mosca). thank you so much for this.

Aedes chokes on numerical topic subpaths in retained messages

I modified the https://github.com/mcollina/aedes/blob/master/benchmarks/pingpong.js benchmark as follows...

client.publish('test', JSON.stringify(process.hrtime()), { qos: 1})

...changed to...

client.publish(('/' + sent), JSON.stringify(process.hrtime()), { qos: 1, retain:true})

...and...

client.subscribe('test', { qos: 1 }, publish)

...modified to...

client.subscribe('#', { qos: 1 }, publish)

The server then reports like this...

server listening on port 1883 pid 84111
client error mqttjs_6aaa8387 no such packet
client error mqttjs_6aaa8387 no such packet
client error mqttjs_6aaa8387 no such packet
client error mqttjs_6aaa8387 no such packet
client error mqttjs_6aaa8387 no such packet

...and the client like this...

/usr/local/bin/node benchmarks/pingpong.js
sent/s 1286.8
sent/s 774.5999999999999
sent/s 795.6
offline
offline
offline
offline

By contrast if the same modified benchmark is run against Mosquitto, it reports...

/usr/local/bin/node benchmarks/pingpong.js
sent/s 2240
sent/s 1722.6
sent/s 1711.2
sent/s 1738.2
sent/s 1648.2
sent/s 1721.6
sent/s 1738.6
sent/s 1749.4
sent/s 1713.6
total 51751.08098199938
average 0.6259807550561179
mode [ 0.688611, 0.710682, 0.721568 ]

More events

In Mosca, I've configured a custom bunyan-stream module to capture keepalive timeout & setting keepalive time events since these are logged using bunyan.

Can we add two more emits for these in aedes @mcollina ?

Inflight messages / QoS 1 support for instance.publish()

Hi, Matteo!

I appreciate your work very much since Mosca, and having an embedded broker with custom auth is awesome!

I'm having some issues with achieving QoS 1 behavior when publishing from the broker (not from a client) though. I've set up aedes-redis-persistence and retained messages work fine:

 instance.publish({topic: 'cloud/update', payload: buffer, retain: true})

and I can see them in Redis. But if I try to

instance.publish({topic: 'cloud/update', payload: buffer, qos: 1})

instead - I don't see any inflight messages while no clients connected.

I actually switched to Aedes from Mosca in hope to achieve this, but still no luck. Retain also worked in Mosca but no inflight messages for QoS 1 stored. Maybe I'm doing something wrong?

Client is not passed to server.published method when message has QoS 2

When a message is sent with QoS of 2, the client object is not passed to the published method. The code below demonstrates this behavior.

const AedesPersistence = require('aedes-persistence');

// -- SERVER ---------------------------
const server = require('aedes')({ persistence: new AedesPersistence() });
require('net').createServer(server.handle).listen(3000);

server.published = (packet, client, cb) => {
  if (packet.topic === 'hello') {
    console.log(`QoS ${packet.qos} =>`, client && client.id); // <-- client is null when QoS is 2
  }
  cb();
};

// -- CLIENT ---------------------------
const client = require('mqtt').connect({ host: '127.0.0.1', port: '3000', clientId: 'my-client' });

client.once('connect', () => {
  client.publish('hello', 'world', { qos: 0 }, () => {});
  client.publish('hello', 'world', { qos: 1 }, () => {});
  client.publish('hello', 'world', { qos: 2 }, () => {});
});

Outputs:

QoS 0 => my-client
QoS 1 => my-client
QoS 2 => null

100% CPU and server crashes after connecting 10k clients

Hi @mcollina =)

Getting a strange crash and CPU halt with the latest Aedes (0.21.0):

/var/app/current/node_modules/aedes/node_modules/reusify/reusify.js:7
  function get () {
               ^

RangeError: Maximum call stack size exceeded
    at Object.get (/var/app/current/node_modules/aedes/node_modules/reusify/reusify.js:7:16)
    at SubscribeState.compiled (/var/app/current/node_modules/aedes/node_modules/fastfall/fall.js:38:25)
    at SubscribeState.doSubscribe (/var/app/current/node_modules/aedes/lib/handlers/subscribe.js:34:16)
    at makeCall (/var/app/current/node_modules/aedes/node_modules/fastseries/series.js:113:10)
    at SubscribeState.ResultsHolder.release (/var/app/current/node_modules/aedes/node_modules/fastseries/series.js:94:9)
    at work (/var/app/current/node_modules/aedes/node_modules/fastfall/fall.js:121:25)
    at sendRetained (/var/app/current/node_modules/aedes/lib/handlers/subscribe.js:107:5)
    at SubscribeState.subTopic (/var/app/current/node_modules/aedes/lib/handlers/subscribe.js:102:5)
    at work (/var/app/current/node_modules/aedes/node_modules/fastfall/fall.js:105:23)
    at /var/app/current/node_modules/aedes/lib/handlers/subscribe.js:64:5

Before the server crashes, the process consumed 100% CPU and is completely unresponsive.

This happens after about 10k clients connect. Keepalive interval is set at 5 minutes. Using MemoryPersistence.

Most clients do not subscribe to anything, by the way.

Oh, and I set the concurrency to 10,000. Didn't help.

Issue remains with edge cases

I still seem to be hitting the same scenario where client._keepaliveTimer === null at line 19 in index.js

client._keepaliveTimer.reschedule(client._keepaliveInterval)

It's hard to recreate the exact conditions as this is in our production system, but the packet which is being handled at that moment is...

{"cmd":"puback","retain":false,"qos":0,"dup":false,"length":2,"topic":null,"payload":null,"messageId":30407}

...in case that helps to make sense of it. Otherwise I have to do some fairly heavy lifting to figure out the full sequence of events leading up to this failure.

Here's the stack trace...


TypeError: Cannot read property 'reschedule' of null
    at handle (/Users/cefn/Documents/code/imagination/git/aedes/lib/handlers/index.js:19:27)
    at Parser.enqueue (/Users/cefn/Documents/code/imagination/git/aedes/lib/client.js:204:3)
    at Parser._newPacket (/Users/cefn/Documents/code/imagination/git/aedes/node_modules/mqtt-packet/parser.js:30:10)
    at Parser.parse (/Users/cefn/Documents/code/imagination/git/aedes/node_modules/mqtt-packet/parser.js:42:48)
    at DestroyableTransform.nextBatch (/Users/cefn/Documents/code/imagination/git/aedes/lib/client.js:55:23)
    at emitReadable_ (node_modules/websocket-stream/node_modules/readable-stream/lib/_stream_readable.js:448:10)
    at emitReadable (node_modules/websocket-stream/node_modules/readable-stream/lib/_stream_readable.js:444:5)
    at readableAddChunk (node_modules/websocket-stream/node_modules/readable-stream/lib/_stream_readable.js:187:9)
    at DestroyableTransform.Readable.push (node_modules/websocket-stream/node_modules/readable-stream/lib/_stream_readable.js:149:10)
    at DestroyableTransform.Transform.push (node_modules/websocket-stream/node_modules/readable-stream/lib/_stream_transform.js:145:32)
    at WebSocket.onmessage (node_modules/websocket-stream/stream.js:74:11)
    at WebSocket.onMessage (node_modules/websocket-stream/node_modules/ws/lib/WebSocket.js:414:14)
    at Receiver.onbinary (node_modules/websocket-stream/node_modules/ws/lib/WebSocket.js:804:10)
    at node_modules/websocket-stream/node_modules/ws/lib/Receiver.js:533:18
    at node_modules/websocket-stream/node_modules/ws/lib/Receiver.js:357:7
    at node_modules/websocket-stream/node_modules/ws/lib/PerMessageDeflate.js:217:5
    at afterWrite (_stream_writable.js:346:3)
    at onwrite (_stream_writable.js:337:7)
    at WritableState.onwrite (_stream_writable.js:89:5)
    at afterTransform (_stream_transform.js:79:5)
    at TransformState.afterTransform (_stream_transform.js:54:12)
    at Zlib.callback (zlib.js:614:5)

As a reference, the same production scenario completes with Mosquitto without errors and we're only engaging with Aedes as a client through MQTT.js (with no deeper integration) so I don't think we're doing anything unusual from our side.

ip address

hi @Matteo,
is possible to get the ip addess of a client? Or can you suggest me where is convenient to edit aedes or its modules, to add more information about the client (as the ip address) when the server parse the messages coming from mosca or other mqtt clients?

Aedes fails retained messages stressMQTT tests

Reporting as requested. Focusing on low numbers of topics for now.

Currently Aedes at 2e25a0b (installed by softlinking the git repo directly into my project's node_modules folder) is failing to deliver any retained messages at all within the test suite https://github.com/cefn/stressMQTT/

The test "Wildcard subscription of retained messages" reports to the console as follows indicating no messages delivered (javascript arithmetic is ace)!

Timestamp reset
0 Connected
Wildcard subscription of retained messages
1:1 Sent first message
1:0 Sending msg:0
1:0 Sending msg:1
1:0 Sending msg:2
2:1 Sending msg:3
2:0 Sending msg:4
2:0 Sending msg:5
2:0 Sending msg:6
2:0 Sending msg:7
2:0 Sending msg:8
2:0 Sending msg:9
2:0 Sent 10 messages in 1 ms at 0.1ms/msg
37:35 Subscribing
37:0 Subscribed
9978:9941 Sent 10 messages in 9977 ms at 997.7ms/msg
9978:0 Received 0 messages in 1451525952615 ms at Infinity ms/msg
9978:0 Subscribed to 0 topics in 9941 ms at Infinity ms/topic

If that test is skipped, the third test titled "Individual subscription of retained messages" fails as follows...

Timestamp reset
0 Connected
Individual subscription of retained messages
1:1 Sent first message
1:0 Sending msg:0
1:0 Sending msg:1
1:0 Sending msg:2
1:0 Sending msg:3
1:0 Sending msg:4
1:0 Sending msg:5
1:0 Sending msg:6
2:1 Sending msg:7
2:0 Sending msg:8
2:0 Sending msg:9
2:0 Sent 10 messages in 1 ms at 0.1ms/msg
23:21 Subscribing
23:0 Subscribed
23:0 Subscribing to topic:0
23:0 Subscribing to topic:1
24:1 Subscribing to topic:2
24:0 Subscribing to topic:3
24:0 Subscribing to topic:4
24:0 Subscribing to topic:5
24:0 Subscribing to topic:6
24:0 Subscribing to topic:7
24:0 Subscribing to topic:8
24:0 Subscribing to topic:9
24:0 Subscribed to 10 topics in 1 ms at 0.1ms/topic
/Users/cefn/Documents/code/imagination/git/aedes/lib/handlers/index.js:19
    client._keepaliveTimer.reschedule(client._keepaliveInterval)
                          ^

TypeError: Cannot read property 'reschedule' of null
    at handle (/Users/cefn/Documents/code/imagination/git/aedes/lib/handlers/index.js:19:27)
    at Parser.enqueue (/Users/cefn/Documents/code/imagination/git/aedes/lib/client.js:204:3)
    at emitOne (events.js:77:13)
    at Parser.emit (events.js:169:7)
    at Parser._newPacket (/Users/cefn/Documents/code/imagination/git/aedes/node_modules/mqtt-packet/parser.js:30:10)
    at Parser.parse (/Users/cefn/Documents/code/imagination/git/aedes/node_modules/mqtt-packet/parser.js:42:48)
    at DestroyableTransform.nextBatch (/Users/cefn/Documents/code/imagination/git/aedes/lib/client.js:55:23)
    at emitNone (events.js:67:13)
    at DestroyableTransform.emit (events.js:166:7)
    at emitReadable_ (/Users/cefn/Documents/code/imagination/git/stressMQTT/node_modules/readable-stream/lib/_stream_readable.js:464:10)
9970:9946 Sent 10 messages in 9969 ms at 996.9ms/msg
9970:0 Received 0 messages in 1451526399729 ms at Infinity ms/msg
9970:0 Subscribed to 10 topics in 9947 ms at 994.7ms/topic

You'll be glad to hear, the fourth test succeeds, though! :)

By contrast, switching the choice of server in the test config header for Mosca or Mosquitto means the same test suite completes with all four as green tests.

aedes with websocket?

Perhaps a silly question, but how does one create a websocket server that
can accept aedes instance.handle() ?

There seems to be no ws analogue for
var server = require('net').createServer(aedes.handle)

Which of the numerous websocket nodejs modules is the optimal for aedes?

And thank you for great work on Mosca and now Aedes !

Help with qos 2

I am trying to implement qos 2 and using aedes persistence lib for in memory persistence just for proof of concept that it works. Code below:

In one of the js files I call below: (aedesServer is instance aedesServer which is a separate configuration file)

aedesServer.publish(message, function (errm) { });

In message I am setting messageId 22 jst to test and found that

aedes.on('publish', ...

always has messageId 0 .

Q1 -> Is messageId added and tracked by broker? If yes, then whey I keep getting messageId as 0 in on publish event?
Q2 -> Q1 is when I am publishing message to clients directly from broker. What if I publish directly from a connected client to broker? Should I add messageId then or broker takes care of it as well?
Q3 -> I published one message with messageId as 22 but no client was connected and subscribed to that topic at that point. After that I connected the client and subscribed to that topic. Shouldn't in memory persistence send that offline packet?

code for persistence

var inmempersistence = require('aedes-persistence')()
var aedes = require('aedes')({
        persistence: inmempersistence
});

Replicating on multi core cpu

I'm currently looking into using aedes for a web / mobile based chat platform.
However, due to node's limitation of using only 1 CPU core, I need to scale this at least to use all the cores. I tried to spawn the processes using redis persistence, but between clients, publishes were not shared.

Any tips ?

This and mosca, a little guidance...

This is not a real issue but a question for guidance. I've played with Mosca and everything worked well. I was curious about QOS 2, and then found this project as a result (it implements it where the Mosca did not, even though I don't need it).

At the bottom of the readme for aedes it says "This addresses your concerns about performance and stability.".

I was curious if this could be elaborated further. What issues are people finding? My feeling was that mosca was a fairly small layer running on top of redis or your backend of choice (which made me assume that is was very performant and ready for production as you can just launch more instances and as long as the backend can handle it then you are good, assuming you have a load balancer and stuff).

At this juncture, especially if using redis (which allows clusters and running on multi-cores for aedes if I'm understanding things), would aedes be the better project to use in a production environment over mosca? Especially if we have just been playing with mosca at this point?

KeepAlive Timeouts

We have suddenly started getting keep alive timeouts suddenly. It might be a client issue but can you check if latest commit caused anything

2016-02-12T06:06:17.805Z - error: Error: keep alive timeout
at keepaliveTimeout (/app/nodeApp/hopponnodejs/HopponSimulator/node_modules/aedes/lib/handlers/connect.js:43:28)
at timerWrapper as _onTimeout
at Timer.listOnTimeout as ontimeout
2016-02-12T06:06:17.805Z - error: client error andr_3bb73ec23c08b68c keep alive timeout
2016-02-12T06:06:17.805Z - info: Client disconnected - andr_3bb73ec23c08b68c

does aedis can be deploy distributed?

1、aedes can be deploy many instances by pm2 in one pc?
2、aedes can be deploy on many pc distributed?
3、is needed bridges if deploy on many pc?

TLS Options while creating sever

How can I use PFX option as in regular NodeJS TLS
Like the following.
var options = {
pfx: fs.readFileSync('client.pfx')
};

Besides pfx/pkcs, the following parameters are necessary to have at the minimum - crl, passphrase, requestCert, rejectUnauthorized. Ideally the whole TLS options should be available in the same way regular TLS options work. If already possible - please provide an example.

event sequence on reconnect

In mosca, when an offline client (which is not yet keep-alived in mosca side) tries to reconnect, server emits a connected event followed by a disconnected for old connection which mosca closes.
This makes it hard for apps to easily recognize correct client status (online/offline) and also produces a race.
How is this in Aedes? and can server first emit disconnected (or even don't emit it in such scenario) and finally emit connected event at the end of processing new connection?

cluster mode

I have see that the heartbeatInterval is for cluster,so can I consider that the aedes will be support cluster mode? and how long time it will come to become reality!
thanks very much!

Possible memory leak

I surfaced a possible memory leak in aedes while sending 20,000 messages in a one to one setup (one sender,one receiver)

I got the following warning

(node) warning: possible EventEmitter memory leak detected. 501 drain listeners added. Use emitter.setMaxListeners() to increase limit.
Trace
    at Socket.addListener (events.js:239:17)
    at Socket.Readable.on (_stream_readable.js:665:33)
    at Socket.once (events.js:265:8)
    at write (/Users/gavindmello/aedes/proxy/src/node_modules/aedes/lib/write.js:9:17)
    at writeQoS (/Users/gavindmello/aedes/proxy/src/node_modules/aedes/lib/client.js:101:5)
    at MemoryPersistence.outgoingUpdate (/Users/gavindmello/aedes/proxy/src/node_modules/aedes/node_modules/aedes-persistence/persistence.js:180:14)
    at Client.deliverQoS (/Users/gavindmello/aedes/proxy/src/node_modules/aedes/lib/client.js:84:26)
    at deliver (/Users/gavindmello/aedes/proxy/src/node_modules/aedes/lib/client.js:134:12)
    at nextTickCallbackWith0Args (node.js:419:9)
    at process._tickCallback (node.js:348:13)

I'm think the listener drain in the write.js file isn't removed. The issue however subsided when I added client.conn.removeAllListeners('drain') . Here

'use strict'

var mqtt = require('mqtt-packet')

function write (client, packet, done) {
  var result = mqtt.writeToStream(packet, client.conn)

  if (!result && !client.errored && done) {
    client.conn.once('drain', done)
    client.conn.removeAllListeners('drain');
  } else if (done) {

    setImmediate(done)
  }
}

module.exports = write

Any sort of assistance would be helpful

MongoDB Persistence

@mcollina : We are using mongodb for persistence with mosca and want to use the same with aedes as well. Do you know when can we have that?

Publishing 2048 bytes messages at a high rate, they are lost at all QOS

I'm trying to switch to aedes from mosquitto.
Using your example.js.
I'm publishing 667 2048bytes messages with QOS 2,1 or 0 at a high rate. I publish via mqtt, and receive both via mqtt and websocket, all messages are lost.
This warning is show in the output several times in between of 'message from client' messages.
Warning: Possible EventEmitter memory leak detected. 201 drain listeners added. Use emitter.setMaxListeners() to increase limit
I tried to send the same data in 256bytes messages, and they were lost as well.
Then I've tried to send 20480bytes messages and as soon as there were only 67 in total, they were delivered. 204800bytes messages are delivered as well.

Bunyan support

Bunyan log/stream support in Mosca was very helpful in my production system. Are you planning to also add it to Aedes @mcollina ? or alternative currently available inside?

Latest commit 0.12.4 issue

Hi
I am getting client disconnect errors quite frequently now

1452602353596 INFO new client andr_3e619e2221571966
1452602353597 INFO message published from broker to $SYS/EkYCGyCDl/new/clients 0
1452602353841 INFO Subscription Package:
{"topic":"user/10204470020541040/trip/5694f3e7c6fbf8144fd4c654/riderequests","qos":2}
1452602353841 INFO Client: andr_3e619e2221571966, Topic: user/10204470020541040/trip/5694f3e7c6fbf8144fd4c654/riderequests, Method: authorizeSubscribe; Entering function
1452602354419 INFO Client: andr_3e619e2221571966, Topic: user/10204470020541040/trip/5694f3e7c6fbf8144fd4c654/riderequests, Method: authorizeSubscribe; Subscription successful
1452602356190 INFO message published from broker to $SYS/EkYCGyCDl/heartbeat 0
1452602378812 INFO Client disconnected - andr_3e619e2221571966

Authentication Hook

Another issue with Mosca is the race with plugging authentication hook, and that is because of setting auth handler after server boot, not as an configuration option.

server = new mosca.Server(moscaSettings);
//server is now ready and may accept new connections
server.authenticate = authorizer.authenticate;
server.authorizeSubscribe = authorizer.authorizeSubscribe;
server.authorizePublish = authorizer.authorizePublish;

This will be important on server restarts in production (with heavy connected clients)

Latencies report strangely in pingpong.js

I found myself inspecting the contents of the latencies array after seeing strange numbers reported.

After a few numbers which look sensible, (though slow, assuming this should be interpreted in milliseconds), the following numbers seem to go a bit crazy. See this snapshot from different positions in the latencies array. I suspect this is to do with interactions between hrtime calls and convert-hrtime.

24 = 61.984865
25 = 61.991153
26 = 61.68552
27 = 60.849604
28 = 58343.782468
29 = 58343.993224
30 = 58343.974457
31 = 58342.961596

Maximum call stack size exceeded

I was trying to send a multiple message using QoS 2 in one topic while the receiver is offline. When i reconnect the receiver with un-clean session i throws an error and after a few errors aedes will crash.

`C:\nodejs\node_modules\aedes\node_modules\aedes-persistence\persistence.js:74
return {
^

RangeError: Maximum call stack size exceeded
at mapSub (C:\nodejs\node_modules\aedes\node_modules\aedes-persistence\per
sistence.js:74:5)
at Array.map (native)
at MemoryPersistence.addSubscriptions (C:\nodejs\node_modules\aedes\node_m
odules\aedes-persistence\persistence.js:73:8)
at SubscribeState.storeSubscriptions (C:\nodejs\node_modules\aedes\lib\han
dlers\subscribe.js:63:9)
at work (C:\nodejs\node_modules\aedes\node_modules\fastfall\fall.js:105:23
)
at Aedes.defaultAuthorizeSubscribe [as authorizeSubscribe](C:nodejsnode
_modulesaedesaedes.js:294:3)
at SubscribeState.authorize (C:\nodejs\node_modules\aedes\lib\handlers\sub
scribe.js:39:17)
at Holder.work (C:\nodejs\node_modules\aedes\node_modules\fastfall\fall.js
:105:23)
at SubscribeState.compiled (C:\nodejs\node_modules\aedes\node_modules\fast
fall\fall.js:55:17)
at SubscribeState.doSubscribe (C:\nodejs\node_modules\aedes\lib\handlers\s
ubscribe.js:34:16)`

I'm not sure why but it should be possible right? when a the receiving client disconnected temporarily? Tho if i only send one message it works well. i think i may have a persistence setting problem.

Use mqtt-stack & mqtt-spec?

Hi Matteo,

Just saw that you started that project. Are you considering using the mqtt-stack and mqtt-spec stuff I started? There is not much missing, I just hadn't the time to finish it yet.

clientDisconnect emited twice

when a client disconnects abnormally without a disconnect message, the clientDisconnect event emited twice, which also infulence connected client count.

aedes.on('clientDisconnect', function(client) {
console.log('Client Disconnected: ' + client.id + ' total: ' + aedes.connectedClients);
});

Resilient Environment

Hi
We are using aedes-redis persistence and want to check if we can use multiple nodes running broker with a load balancer that routes TCP traffic accordingly or will there be any issues with it?

Thanks,
Ashish

broker keeps dying due to keep alive timeout

I have aedes running as a broker locally, and whenever a keep alive timeout on the connect handler is missed, it fires an error, as I'd expect. However, that error kills the broker entirely, which is surprising me to me. Is this intended behaviour, and if so, why?

unable to unsubscribe topics after client disconnects

Hi Mcollina,

I want to unsubscribe to all the topics a client has subscribed to when he disconnects. After I reconnect I should not get messages of those topics unless I subscribe again. In order to achieve this on 'clientDisconnect' and 'clientError' events I am trying to explicitly unsubscribe to the topics. However, on these events I am getting 'subscriptions' object of the client as empty. But when I connect again, I am getting back all subscriptions on the 'subscriptions' object.

PS: I am using cleansession: false on client. QOS: 1

"no such packet" errors in clientError event

Clients keeps throwing error. Do you know what it means?
Also sometimes I get 10s/100s of subscription logs for same client and topic whereas frontend only tried once as below:

2016-01-10T19:58:28.221Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.221Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.221Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.221Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.222Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.222Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.222Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.223Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.223Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.223Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.223Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.223Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.223Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.223Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.223Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.223Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.223Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.223Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.223Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.223Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.223Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.223Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful
2016-01-10T19:58:28.224Z - info: Client: andr_d1e5c1622809744e, Topic: , Method: authorizeSubscribe; Subscription successful

Client subscribed over multiple devices

Hello.

I was wondering how should I proceed if I wanted to publish a packet to a topic, and would want all the users to receive the publish ,except the devices the original users is connected on.

As a use case, I would want to implement a notifications system over MQTT, but if a user published a packet on a topic, I don't want him to receive the notification of the client's action on the users other connected devices

KeepaliveInterval multiplication

Good afternoon,

as I was wondering how you implemented the keepalive functionality I noticed this piece of code:

  if (packet.keepalive > 0) {
    client._keepaliveInterval = packet.keepalive * 1501
    client._keepaliveTimer = retimer(function keepaliveTimeout () {
      client.broker.emit('keepaliveTimeout', client)
      client.emit('error', new Error('keep alive timeout'))
    }, client._keepaliveInterval)
  }

after some thinking what the 1501 factor was, I came up with this.
1.5 (maximum keepalive until disconnect) * 1000 (milliseconds) + 1 (exceed 1 and get disconnected)

The problem with the current implementation is that if a client has a huge keepalive sent with the connect packet, quote:

The maximum keep alive is 18h 12min 15 sec

it gets multiplied by 1501 instead of 1500 with an addition of one millisecond.
Do you want me to send a PR, or have I interpreted this code in a wrong manner and is this intended behavior?

IMO, this would be the fix:
client._keepaliveInterval = packet.keepalive * 1500 + 1

repeated connects/disconnects...

I am using aedes with mqtt.js on 1 machine. I can get aedes into a state where the client connects and then immediately disconnects. The connection appears to be successful (i.e., i get back a successful connection). It seems to occur when there is either a heavy load on aedes (for example, when I am creating 20 clients all at one time), but can also happen sometimes when in the connect handler, I set up a few subscriptions. In the case when I have 20 clients (all identical), 2 will display this behavior: they will connect, immediately be disconnected, and then repeatedly reconnect and be disconnected.

It is intermittent, and when it doesn't occur I am blown away by the performance of aedes (Very nice job).

I don't see this behavior when using the same code to connect to mosca or mosquitto.

I have attached the 2 source files that demonstrate this behavior. Any help would be greatly appreciated.

bb
aedes example.zip

persistence does not work as expected...

Attached is a simple test case that does not exhibit expected persistence with either level, redis or in-memory based persistence using aedes but does so with mosca and mosquitto.

  1. Run the test case
  2. Quit
  3. Restart and you should see the values for 'device/tester/status' and 'device/tester/variable' from the previous run as well as the newly published values. Instead you only see the new values. Despite marking the topics as retained and qos:1, the topics are not published on new subscriptions.

aedes_demostration.zip

conflict id problem

Disconnect other clients with the same client.id

I guess there may be security issues. Any client could connect to server with any id, and then kick other cients.

Why are messages with qos 2 stuck on redis when we have that client connected and subscribled?

Hi
We have scenario as below

  1. ClientA connects with qos2 and subscribes to /user/testtopic
  2. ClientB publishes Message1 and it is delieverd fine
  3. ClientA turns off internet
  4. ClientB publishes Message2,Message3 etc
  5. ClientA turns internet on
  6. Client ONLY GETS ONE MESSAGE and rest all are stuck in redis

If client A turns off internet again and turns it on again it gets couple of messages again
Why are "all" not delivered on time and in sequence?

UPDATE - It sends in chunks of 4 actually. So, if client A turns internet off and turns it back on it receoves another set of pending 4. Why is the order not maintained?

Clean session property is false and retain is also false and qos is 2

Thanks,
Ashish

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.