Git Product home page Git Product logo

Comments (11)

getlarge avatar getlarge commented on June 12, 2024 1

This could be an opportunity to revise how modules are exported and solve #878 by the same occasion.

from aedes.

getlarge avatar getlarge commented on June 12, 2024 1

I will work on a PR soon. For reference i am copying the patch that i applied for aedes-otel-instrumentation.

diff --git a/node_modules/aedes/aedes.js b/node_modules/aedes/aedes.js
index c02d289..668b162 100644
--- a/node_modules/aedes/aedes.js
+++ b/node_modules/aedes/aedes.js
@@ -7,7 +7,7 @@ const series = require('fastseries')
 const { v4: uuidv4 } = require('uuid')
 const reusify = require('reusify')
 const { pipeline } = require('stream')
-const Packet = require('aedes-packet')
+const { Packet } = require('aedes-packet')
 const memory = require('aedes-persistence')
 const mqemitter = require('mqemitter')
 const Client = require('./lib/client')
@@ -45,6 +45,7 @@ function Aedes (opts) {
   // +1 when construct a new aedes-packet
   // internal track for last brokerCounter
   this.counter = 0
+  this.concurrency = opts.concurrency
   this.queueLimit = opts.queueLimit
   this.connectTimeout = opts.connectTimeout
   this.maxClientsIdLength = opts.maxClientsIdLength
@@ -52,24 +53,19 @@ function Aedes (opts) {
     concurrency: opts.concurrency,
     matchEmptyLevels: true // [MQTT-4.7.1-3]
   })
-  this.handle = function handle (conn, req) {
-    conn.setMaxListeners(opts.concurrency * 2)
-    // create a new Client instance for a new connection
-    // return, just to please standard
-    return new Client(that, conn, req)
-  }
+
   this.persistence = opts.persistence || memory()
   this.persistence.broker = this
   this._parallel = parallel()
   this._series = series()
   this._enqueuers = reusify(DoEnqueues)
 
-  this.preConnect = opts.preConnect
-  this.authenticate = opts.authenticate
-  this.authorizePublish = opts.authorizePublish
-  this.authorizeSubscribe = opts.authorizeSubscribe
-  this.authorizeForward = opts.authorizeForward
-  this.published = opts.published
+  this._preConnect = opts.preConnect
+  this._authenticate = opts.authenticate
+  this._authorizePublish = opts.authorizePublish
+  this._authorizeSubscribe = opts.authorizeSubscribe
+  this._authorizeForward = opts.authorizeForward
+  this._published = opts.published
 
   this.decodeProtocol = opts.decodeProtocol
   this.trustProxy = opts.trustProxy
@@ -250,6 +246,15 @@ function removeSharp (sub) {
   return code !== 43 && code !== 35
 }
 
+// assiging to prototype is a breaking change as it is required to bind the Aedes instance to the function
+// @example net.createServer(broker.handle.bind(broker)) or net.createServer((socket) => broker.handle(socket))
+Aedes.prototype.handle = function handle (conn, req) {
+    conn.setMaxListeners(this.concurrency * 2)
+    // create a new Client instance for a new connection
+    // return, just to please standard
+    return new Client(this, conn, req)
+}
+
 function callPublished (_, done) {
   this.broker.published(this.packet, this.client, done)
   this.broker.emit('publish', this.packet, this.client)
@@ -338,6 +343,30 @@ Aedes.prototype.close = function (cb = noop) {
 
 Aedes.prototype.version = require('./package.json').version
 
+Aedes.prototype.preConnect = function (client, packet, callback) {
+  this._preConnect(client, packet, callback)
+}
+
+Aedes.prototype.authenticate = function (client, username, password, callback) {
+  this._authenticate(client, username, password, callback)
+}
+
+Aedes.prototype.authorizePublish = function (client, packet, callback) {
+  this._authorizePublish(client, packet, callback)
+}
+
+Aedes.prototype.authorizeSubscribe = function (client, sub, callback) {
+  this._authorizeSubscribe(client, sub, callback)
+}
+
+Aedes.prototype.authorizeForward = function (client, packet) {
+  return this._authorizeForward(client, packet)
+}
+
+Aedes.prototype.published = function (packet, client, callback) {
+  this._published(packet, client, callback)
+}
+
 function defaultPreConnect (client, packet, callback) {
   callback(null, true)
 }
diff --git a/node_modules/aedes/lib/client.js b/node_modules/aedes/lib/client.js
index 414d8e5..e525712 100644
--- a/node_modules/aedes/lib/client.js
+++ b/node_modules/aedes/lib/client.js
@@ -4,12 +4,12 @@ const mqtt = require('mqtt-packet')
 const EventEmitter = require('events')
 const util = require('util')
 const eos = require('end-of-stream')
-const Packet = require('aedes-packet')
-const write = require('./write')
+const { Packet } = require('aedes-packet')
+const { write } = require('./write')
 const QoSPacket = require('./qos-packet')
-const handleSubscribe = require('./handlers/subscribe')
-const handleUnsubscribe = require('./handlers/unsubscribe')
-const handle = require('./handlers')
+const { handleSubscribe } = require('./handlers/subscribe')
+const { handleUnsubscribe } = require('./handlers/unsubscribe')
+const { handle } = require('./handlers')
 const { pipeline } = require('stream')
 const { through } = require('./utils')
 
diff --git a/node_modules/aedes/lib/handlers/connect.js b/node_modules/aedes/lib/handlers/connect.js
index a4c32d0..bd2d8cb 100644
--- a/node_modules/aedes/lib/handlers/connect.js
+++ b/node_modules/aedes/lib/handlers/connect.js
@@ -2,10 +2,10 @@
 
 const retimer = require('retimer')
 const { pipeline } = require('stream')
-const write = require('../write')
+const { write } = require('../write')
 const QoSPacket = require('../qos-packet')
 const { through } = require('../utils')
-const handleSubscribe = require('./subscribe')
+const { handleSubscribe } = require('./subscribe')
 const uniqueId = require('hyperid')()
 
 function Connack (arg) {
@@ -264,4 +264,4 @@ function emptyQueueFilter (err, client, packet) {
   }
 }
 
-module.exports = handleConnect
+module.exports = { handleConnect }
diff --git a/node_modules/aedes/lib/handlers/index.js b/node_modules/aedes/lib/handlers/index.js
index a5dfaa8..b611293 100644
--- a/node_modules/aedes/lib/handlers/index.js
+++ b/node_modules/aedes/lib/handlers/index.js
@@ -1,13 +1,13 @@
 'use strict'
 
-const handleConnect = require('./connect')
-const handleSubscribe = require('./subscribe')
-const handleUnsubscribe = require('./unsubscribe')
-const handlePublish = require('./publish')
-const handlePuback = require('./puback')
-const handlePubrel = require('./pubrel')
-const handlePubrec = require('./pubrec')
-const handlePing = require('./ping')
+const { handleConnect } = require('./connect')
+const { handleSubscribe } = require('./subscribe')
+const { handleUnsubscribe } = require('./unsubscribe')
+const { handlePublish } = require('./publish')
+const { handlePuback } = require('./puback')
+const { handlePubrel } = require('./pubrel')
+const { handlePubrec } = require('./pubrec')
+const { handlePing } = require('./ping')
 
 function handle (client, packet, done) {
   if (packet.cmd === 'connect') {
@@ -74,4 +74,4 @@ function finish (conn, packet, done) {
   done(error)
 }
 
-module.exports = handle
+module.exports = { handle }
diff --git a/node_modules/aedes/lib/handlers/ping.js b/node_modules/aedes/lib/handlers/ping.js
index a4c042c..69b3ded 100644
--- a/node_modules/aedes/lib/handlers/ping.js
+++ b/node_modules/aedes/lib/handlers/ping.js
@@ -1,6 +1,6 @@
 'use strict'
 
-const write = require('../write')
+const { write } = require('../write')
 const pingResp = {
   cmd: 'pingresp'
 }
@@ -10,4 +10,4 @@ function handlePing (client, packet, done) {
   write(client, pingResp, done)
 }
 
-module.exports = handlePing
+module.exports = { handlePing }
diff --git a/node_modules/aedes/lib/handlers/puback.js b/node_modules/aedes/lib/handlers/puback.js
index e4b419c..8376861 100644
--- a/node_modules/aedes/lib/handlers/puback.js
+++ b/node_modules/aedes/lib/handlers/puback.js
@@ -8,4 +8,4 @@ function handlePuback (client, packet, done) {
   })
 }
 
-module.exports = handlePuback
+module.exports = { handlePuback }
diff --git a/node_modules/aedes/lib/handlers/publish.js b/node_modules/aedes/lib/handlers/publish.js
index e30c9db..5c3e167 100644
--- a/node_modules/aedes/lib/handlers/publish.js
+++ b/node_modules/aedes/lib/handlers/publish.js
@@ -1,6 +1,6 @@
 'use strict'
 
-const write = require('../write')
+const { write } = require('../write')
 
 function PubAck (packet) {
   this.cmd = 'puback'
@@ -62,4 +62,4 @@ function authorizePublish (packet, done) {
   this.broker.authorizePublish(this, packet, done)
 }
 
-module.exports = handlePublish
+module.exports = { handlePublish }
diff --git a/node_modules/aedes/lib/handlers/pubrec.js b/node_modules/aedes/lib/handlers/pubrec.js
index 5c914dd..dc7a7f0 100644
--- a/node_modules/aedes/lib/handlers/pubrec.js
+++ b/node_modules/aedes/lib/handlers/pubrec.js
@@ -1,6 +1,6 @@
 'use strict'
 
-const write = require('../write')
+const { write } = require('../write')
 
 function PubRel (packet) {
   this.cmd = 'pubrel'
@@ -27,4 +27,4 @@ function handlePubrec (client, packet, done) {
   }
 }
 
-module.exports = handlePubrec
+module.exports = { handlePubrec }
diff --git a/node_modules/aedes/lib/handlers/pubrel.js b/node_modules/aedes/lib/handlers/pubrel.js
index 09dcc86..672b697 100644
--- a/node_modules/aedes/lib/handlers/pubrel.js
+++ b/node_modules/aedes/lib/handlers/pubrel.js
@@ -1,6 +1,6 @@
 'use strict'
 
-const write = require('../write')
+const { write } = require('../write')
 
 function ClientPacketStatus (client, packet) {
   this.client = client
@@ -47,4 +47,4 @@ function pubrelDel (arg, done) {
   persistence.incomingDelPacket(this.client, arg.packet, done)
 }
 
-module.exports = handlePubrel
+module.exports = { handlePubrel }
diff --git a/node_modules/aedes/lib/handlers/subscribe.js b/node_modules/aedes/lib/handlers/subscribe.js
index 2470427..e2007aa 100644
--- a/node_modules/aedes/lib/handlers/subscribe.js
+++ b/node_modules/aedes/lib/handlers/subscribe.js
@@ -1,10 +1,10 @@
 'use strict'
 
 const fastfall = require('fastfall')
-const Packet = require('aedes-packet')
+const { Packet } = require('aedes-packet')
 const { through } = require('../utils')
 const { validateTopic, $SYS_PREFIX } = require('../utils')
-const write = require('../write')
+const { write } = require('../write')
 
 const subscribeTopicActions = fastfall([
   authorize,
@@ -245,4 +245,4 @@ function completeSubscribe (err) {
 
 function noop () { }
 
-module.exports = handleSubscribe
+module.exports = { handleSubscribe }
diff --git a/node_modules/aedes/lib/handlers/unsubscribe.js b/node_modules/aedes/lib/handlers/unsubscribe.js
index e08c317..b9cd7ef 100644
--- a/node_modules/aedes/lib/handlers/unsubscribe.js
+++ b/node_modules/aedes/lib/handlers/unsubscribe.js
@@ -1,6 +1,6 @@
 'use strict'
 
-const write = require('../write')
+const { write } = require('../write')
 const { validateTopic, $SYS_PREFIX } = require('../utils')
 
 function UnSubAck (packet) {
@@ -101,4 +101,4 @@ function completeUnsubscribe (err) {
 
 function noop () { }
 
-module.exports = handleUnsubscribe
+module.exports = { handleUnsubscribe }
diff --git a/node_modules/aedes/lib/qos-packet.js b/node_modules/aedes/lib/qos-packet.js
index 5527fe1..07c1581 100644
--- a/node_modules/aedes/lib/qos-packet.js
+++ b/node_modules/aedes/lib/qos-packet.js
@@ -1,6 +1,6 @@
 'use strict'
 
-const Packet = require('aedes-packet')
+const { Packet } = require('aedes-packet')
 const util = require('util')
 
 function QoSPacket (original, client) {
diff --git a/node_modules/aedes/lib/write.js b/node_modules/aedes/lib/write.js
index 716d81a..a5d186c 100644
--- a/node_modules/aedes/lib/write.js
+++ b/node_modules/aedes/lib/write.js
@@ -21,4 +21,4 @@ function write (client, packet, done) {
   setImmediate(done, error, client)
 }
 
-module.exports = write
+module.exports = { write }
diff --git a/node_modules/aedes/types/client.d.ts b/node_modules/aedes/types/client.d.ts
index 2906213..c415fce 100644
--- a/node_modules/aedes/types/client.d.ts
+++ b/node_modules/aedes/types/client.d.ts
@@ -6,10 +6,10 @@ import {
   Subscriptions,
   UnsubscribePacket
 } from './packet'
-import { Connection } from './instance'
+import Aedes, { Connection } from './instance'
 import { EventEmitter } from 'node:events'
 
-export interface Client extends EventEmitter {
+export class Client extends EventEmitter {
   id: Readonly<string>;
   clean: Readonly<boolean>;
   version: Readonly<number>;
@@ -19,6 +19,8 @@ export interface Client extends EventEmitter {
   connected: Readonly<boolean>;
   closed: Readonly<boolean>;
 
+  constructor(broker: Aedes, conn: Connection, req?: IncomingMessage)
+
   on(event: 'connected', listener: () => void): this;
   on(event: 'error', listener: (error: Error) => void): this;

from aedes.

mcollina avatar mcollina commented on June 12, 2024

This seems a massive change to do (not opposed).

I think modifying aedes-packet is incorrect, as the transaction should be decouple in the tracing flow: publishing terminates when mqemitter terminates, and delivery starts as another trace. This is needed to correctly support multi-processes systems with Redis or MongoDB

from aedes.

getlarge avatar getlarge commented on June 12, 2024

This seems a massive change to do (not opposed).

I think modifying aedes-packet is incorrect, as the transaction should be decouple in the tracing flow: publishing terminates when mqemitter terminates, and delivery starts as another trace. This is needed to correctly support multi-processes systems with Redis or MongoDB

Indeed, this implies some breaking changes.

Aedes-packet does not need to be modified BUT mqtt-packet has to be patched at runtime to propagate the context and enable distributed traces, those traces are composed of multiple spans which SHOULD be related to trace the communication between multiple services.
That’s how it is specified in OpenTelemetry.

To reformulate your statement :
publishing terminates **a span** when mqemitter terminates, and delivery starts as another **span**. …the latter is linked using the span identifier of the former, as a parent.
This test illustrates it.

Regarding your concern about multi process systems, the packets are stored by those systems (in aedes-persistence-X and aedes-emitter-X) right ? So as long as the context is correctly serialized into the packet it should be fine ?
Am i missing something ?
BTW for the serialization of the context, i simply followed what is suggested in this document, as you can here

from aedes.

mcollina avatar mcollina commented on June 12, 2024

Changing mqtt-packet is a non starter unfortunately:(.

from aedes.

getlarge avatar getlarge commented on June 12, 2024

Changing mqtt-packet is a non starter unfortunately:(.

Maybe i wasn’t clear, no change will be requested in the mqtt-packet source code.

from aedes.

robertsLando avatar robertsLando commented on June 12, 2024

@mcollina what we would like to know is mostly if the approach could have some performance implications and/or if there are better alternatives

from aedes.

mcollina avatar mcollina commented on June 12, 2024

I think the changes here are not really needed, as the "in-the-middle" approach would be sufficient. However I think it would make things easier.

As for perf, it should be neutral when no monkeypatching is used.

A better approach would be to design a system that requires no monkeypatching at all. This would definitely be faster.


How do you plan to propagate the trace over the MQTT protocol?

from aedes.

getlarge avatar getlarge commented on June 12, 2024

A better approach would be to design a system that requires no monkeypatching at all. This would definitely be faster.

How do you imagine this design ?
Allowing the Aedes consumer to provide some extra functions that can wrap the functions that needs to monitored ?


How do you plan to propagate the trace over the MQTT protocol?

As said in one of the message above:
There is a document that recommends solutions to store and retrieve the context (trace) to and from the MQTT packet.
To summarize it:

  1. for MQTT v5 (which does not yet apply for Aedes) packet.properties.userProperties should be used to store the traceparent and tracestate as properties.
  2. for MQTT v3, in the case of a JSON payload, the traceparent and tracestate should be stored as properties.
  3. for MQTT v3, in other cases the approach is a bit more vague and relies on the binary protocol proposal. It is a bit more vague as it does not recommend a location for the context to be stored, but it suggests a serialization/deserialization algorithms that we could apply. In that case i thought we could prepend the packet payload with it.

from aedes.

mcollina avatar mcollina commented on June 12, 2024

IMHO this makes sense only with MQTT5

from aedes.

getlarge avatar getlarge commented on June 12, 2024

I agree that it would make less of a performance penalty for MQTT 5. For MQTT 3 if some users truly wish to propagate trace between their systems, i don't see another alternative (except prepending the whole MQTT packet with the trace context, in the same way this is done for the PROXY protocol).
It could be an opt-in feature ?

from aedes.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.