Git Product home page Git Product logo

server's Introduction

Mochi-MQTT Server

build status Coverage Status Go Report Card Go Reference contributions welcome

English | 简体中文 | 日本語 | Translators Wanted!

🎆 mochi-co/mqtt is now part of the new mochi-mqtt organisation. Read about this announcement here.

Mochi-MQTT is a fully compliant, embeddable high-performance Go MQTT v5 (and v3.1.1) broker/server

Mochi MQTT is an embeddable fully compliant MQTT v5 broker server written in Go, designed for the development of telemetry and internet-of-things projects. The server can be used either as a standalone binary or embedded as a library in your own applications, and has been designed to be as lightweight and fast as possible, with great care taken to ensure the quality and maintainability of the project.

What is MQTT?

MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks (Learn more). Mochi MQTT fully implements version 5.0.0 of the MQTT protocol.

Mochi-MQTT Features

  • Full MQTTv5 Feature Compliance, compatibility for MQTT v3.1.1 and v3.0.0:
    • User and MQTTv5 Packet Properties
    • Topic Aliases
    • Shared Subscriptions
    • Subscription Options and Subscription Identifiers
    • Message Expiry
    • Client Session Expiry
    • Send and Receive QoS Flow Control Quotas
    • Server-side Disconnect and Auth Packets
    • Will Delay Intervals
    • Plus all the original MQTT features of Mochi MQTT v1, such as Full QoS(0,1,2), $SYS topics, retained messages, etc.
  • Developer-centric:
    • Most core broker code is now exported and accessible, for total developer control.
    • Full-featured and flexible Hook-based interfacing system to provide easy 'plugin' development.
    • Direct Packet Injection using special inline client, or masquerade as existing clients.
  • Performant and Stable:
    • Our classic trie-based Topic-Subscription model.
    • Client-specific write buffers to avoid issues with slow-reading or irregular client behaviour.
    • Passes all Paho Interoperability Tests for MQTT v5 and MQTT v3.
    • Over a thousand carefully considered unit test scenarios.
  • TCP, Websocket (including SSL/TLS), and $SYS Dashboard listeners.
  • Built-in Redis, Badger, Pebble and Bolt Persistence using Hooks (but you can also make your own).
  • Built-in Rule-based Authentication and ACL Ledger using Hooks (also make your own).

Compatibility Notes

Because of the overlap between the v5 specification and previous versions of mqtt, the server can accept both v5 and v3 clients, but note that in cases where both v5 an v3 clients are connected, properties and features provided for v5 clients will be downgraded for v3 clients (such as user properties).

Support for MQTT v3.0.0 and v3.1.1 is considered hybrid-compatibility. Where not specifically restricted in the v3 specification, more modern and safety-first v5 behaviours are used instead - such as expiry for inflight and retained messages, and clients - and quality-of-service flow control limits.

When is this repo updated?

Unless it's a critical issue, new releases typically go out over the weekend.

Roadmap

  • Please open an issue to request new features or event hooks!
  • Cluster support.
  • Enhanced Metrics support.

Quick Start

Running the Broker with Go

Mochi MQTT can be used as a standalone broker. Simply checkout this repository and run the cmd/main.go entrypoint in the cmd folder which will expose tcp (:1883), websocket (:1882), and dashboard (:8080) listeners.

cd cmd
go build -o mqtt && ./mqtt

Using Docker

You can now pull and run the official Mochi MQTT image from our Docker repo:

docker pull mochimqtt/server
or
docker run -v $(pwd)/config.yaml:/config.yaml mochimqtt/server 

For most use cases, you can use File Based Configuration to configure the server, by specifying a valid yaml or json config file.

A simple Dockerfile is provided for running the cmd/main.go Websocket, TCP, and Stats server, using the allow-all auth hook.

docker build -t mochi:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 -v $(pwd)/config.yaml:/config.yaml mochi:latest

File Based Configuration

You can use File Based Configuration with either the Docker image (described above), or by running the build binary with the --config=config.yaml or --config=config.json parameter.

Configuration files provide a convenient mechanism for easily preparing a server with the most common configurations. You can enable and configure built-in hooks and listeners, and specify server options and compatibilities:

listeners:
  - type: "tcp"
    id: "tcp12"
    address: ":1883"
  - type: "ws"
    id: "ws1"
    address: ":1882"
  - type: "sysinfo"
    id: "stats"
    address: ":1880"
hooks:
  auth:
    allow_all: true
options:
  inline_client: true

Please review the examples found in examples/config for all available configuration options.

There are a few conditions to note:

  1. If you use file-based configuration, the supported hook types for configuration are currently limited to auth, storage, and debug. Each type of hook can only have one instance.
  2. You can only use built in hooks with file-based configuration, as the type and configuration structure needs to be known by the server in order for it to be applied.
  3. You can only use built in listeners, for the reasons above.

If you need to implement custom hooks or listeners, please do so using the traditional manner indicated in cmd/main.go.

Developing with Mochi MQTT

Importing as a package

Importing Mochi MQTT as a package requires just a few lines of code to get started.

import (
  "log"

  mqtt "github.com/mochi-mqtt/server/v2"
  "github.com/mochi-mqtt/server/v2/hooks/auth"
  "github.com/mochi-mqtt/server/v2/listeners"
)

func main() {
  // Create signals channel to run server until interrupted
  sigs := make(chan os.Signal, 1)
  done := make(chan bool, 1)
  signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
  go func() {
    <-sigs
    done <- true
  }()

  // Create the new MQTT Server.
  server := mqtt.New(nil)
  
  // Allow all connections.
  _ = server.AddHook(new(auth.AllowHook), nil)
  
  // Create a TCP listener on a standard port.
  tcp := listeners.NewTCP("t1", ":1883", nil)
  err := server.AddListener(tcp)
  if err != nil {
    log.Fatal(err)
  }
  

  go func() {
    err := server.Serve()
    if err != nil {
      log.Fatal(err)
    }
  }()

  // Run server until interrupted
  <-done

  // Cleanup
}

Examples of running the broker with various configurations can be found in the examples folder.

Network Listeners

The server comes with a variety of pre-packaged network listeners which allow the broker to accept connections on different protocols. The current listeners are:

Listener Usage
listeners.NewTCP A TCP listener
listeners.NewUnixSock A Unix Socket listener
listeners.NewNet A net.Listener listener
listeners.NewWebsocket A Websocket listener
listeners.NewHTTPStats An HTTP $SYS info dashboard
listeners.NewHTTPHealthCheck An HTTP healthcheck listener to provide health check responses for e.g. cloud infrastructure

Use the listeners.Listener interface to develop new listeners. If you do, please let us know!

A *listeners.Config may be passed to configure TLS.

Examples of usage can be found in the examples folder or cmd/main.go.

Server Options and Capabilities

A number of configurable options are available which can be used to alter the behaviour or restrict access to certain features in the server.

server := mqtt.New(&mqtt.Options{
  Capabilities: mqtt.Capabilities{
    MaximumSessionExpiryInterval: 3600,
    Compatibilities: mqtt.Compatibilities{
      ObscureNotAuthorized: true,
    },
  },
  ClientNetWriteBufferSize: 4096,
  ClientNetReadBufferSize: 4096,
  SysTopicResendInterval: 10,
  InlineClient: false,
})

Review the mqtt.Options, mqtt.Capabilities, and mqtt.Compatibilities structs for a comprehensive list of options. ClientNetWriteBufferSize and ClientNetReadBufferSize can be configured to adjust memory usage per client, based on your needs.

Default Configuration Notes

Some choices were made when deciding the default configuration that need to be mentioned here:

  • By default, the value of server.Options.Capabilities.MaximumMessageExpiryInterval is set to 86400 (24 hours), in order to prevent exposing the broker to DOS attacks on hostile networks when using the out-of-the-box configuration (as an infinite expiry would allow an infinite number of retained/inflight messages to accumulate). If you are operating in a trusted environment, or you have capacity for a larger retention period, you may wish to override this (set to 0 for no expiry).

Event Hooks

A universal event hooks system allows developers to hook into various parts of the server and client life cycle to add and modify functionality of the broker. These universal hooks are used to provide everything from authentication, persistent storage, to debugging tools.

Hooks are stackable - you can add multiple hooks to a server, and they will be run in the order they were added. Some hooks modify values, and these modified values will be passed to the subsequent hooks before being returned to the runtime code.

Type Import Info
Access Control mochi-mqtt/server/hooks/auth . AllowHook Allow access to all connecting clients and read/write to all topics.
Access Control mochi-mqtt/server/hooks/auth . Auth Rule-based access control ledger.
Persistence mochi-mqtt/server/hooks/storage/bolt Persistent storage using BoltDB (deprecated).
Persistence mochi-mqtt/server/hooks/storage/badger Persistent storage using BadgerDB.
Persistence mochi-mqtt/server/hooks/storage/pebble Persistent storage using PebbleDB.
Persistence mochi-mqtt/server/hooks/storage/redis Persistent storage using Redis.
Debugging mochi-mqtt/server/hooks/debug Additional debugging output to visualise packet flow.

Many of the internal server functions are now exposed to developers, so you can make your own Hooks by using the above as examples. If you do, please Open an issue and let everyone know!

Access Control

Allow Hook

By default, Mochi MQTT uses a DENY-ALL access control rule. To allow connections, this must overwritten using an Access Control hook. The simplest of these hooks is the auth.AllowAll hook, which provides ALLOW-ALL rules to all connections, subscriptions, and publishing. It's also the simplest hook to use:

server := mqtt.New(nil)
_ = server.AddHook(new(auth.AllowHook), nil)

Don't do this if you are exposing your server to the internet or untrusted networks - it should really be used for development, testing, and debugging only.

Auth Ledger

The Auth Ledger hook provides a sophisticated mechanism for defining access rules in a struct format. Auth ledger rules come in two forms: Auth rules (connection), and ACL rules (publish subscribe).

Auth rules have 4 optional criteria and an assertion flag:

Criteria Usage
Client client id of the connecting client
Username username of the connecting client
Password password of the connecting client
Remote the remote address or ip of the client
Allow true (allow this user) or false (deny this user)

ACL rules have 3 optional criteria and an filter match:

Criteria Usage
Client client id of the connecting client
Username username of the connecting client
Remote the remote address or ip of the client
Filters an array of filters to match

Rules are processed in index order (0,1,2,3), returning on the first matching rule. See hooks/auth/ledger.go to review the structs.

server := mqtt.New(nil)
err := server.AddHook(new(auth.Hook), &auth.Options{
    Ledger: &auth.Ledger{
    Auth: auth.AuthRules{ // Auth disallows all by default
      {Username: "peach", Password: "password1", Allow: true},
      {Username: "melon", Password: "password2", Allow: true},
      {Remote: "127.0.0.1:*", Allow: true},
      {Remote: "localhost:*", Allow: true},
    },
    ACL: auth.ACLRules{ // ACL allows all by default
      {Remote: "127.0.0.1:*"}, // local superuser allow all
      {
        // user melon can read and write to their own topic
        Username: "melon", Filters: auth.Filters{
          "melon/#":   auth.ReadWrite,
          "updates/#": auth.WriteOnly, // can write to updates, but can't read updates from others
        },
      },
      {
        // Otherwise, no clients have publishing permissions
        Filters: auth.Filters{
          "#":         auth.ReadOnly,
          "updates/#": auth.Deny,
        },
      },
    },
  }
})

The ledger can also be stored as JSON or YAML and loaded using the Data field:

err := server.AddHook(new(auth.Hook), &auth.Options{
    Data: data, // build ledger from byte slice: yaml or json
})

See examples/auth/encoded/main.go for more information.

Persistent Storage

Redis

A basic Redis storage hook is available which provides persistence for the broker. It can be added to the server in the same fashion as any other hook, with several options. It uses github.com/go-redis/redis/v8 under the hook, and is completely configurable through the Options value.

err := server.AddHook(new(redis.Hook), &redis.Options{
  Options: &rv8.Options{
    Addr:     "localhost:6379", // default redis address
    Password: "",               // your password
    DB:       0,                // your redis db
  },
})
if err != nil {
  log.Fatal(err)
}

For more information on how the redis hook works, or how to use it, see the examples/persistence/redis/main.go or hooks/storage/redis code.

Pebble DB

There's also a Pebble Db storage hook if you prefer file-based storage. It can be added and configured in much the same way as the other hooks (with somewhat less options).

err := server.AddHook(new(pebble.Hook), &pebble.Options{
  Path: pebblePath,
  Mode: pebble.NoSync,
})
if err != nil {
  log.Fatal(err)
}

For more information on how the pebble hook works, or how to use it, see the examples/persistence/pebble/main.go or hooks/storage/pebble code.

Badger DB

Similarly, for file-based storage, there is also a BadgerDB storage hook available. It can be added and configured in much the same way as the other hooks.

err := server.AddHook(new(badger.Hook), &badger.Options{
  Path: badgerPath,
})
if err != nil {
  log.Fatal(err)
}

For more information on how the badger hook works, or how to use it, see the examples/persistence/badger/main.go or hooks/storage/badger code.

There is also a BoltDB hook which has been deprecated in favour of Badger, but if you need it, check examples/persistence/bolt/main.go.

Developing with Event Hooks

Many hooks are available for interacting with the broker and client lifecycle. The function signatures for all the hooks and mqtt.Hook interface can be found in hooks.go.

The most flexible event hooks are OnPacketRead, OnPacketEncode, and OnPacketSent - these hooks be used to control and modify all incoming and outgoing packets.

Function Usage
OnStarted Called when the server has successfully started.
OnStopped Called when the server has successfully stopped.
OnConnectAuthenticate Called when a user attempts to authenticate with the server. An implementation of this method MUST be used to allow or deny access to the server (see hooks/auth/allow_all or basic). It can be used in custom hooks to check connecting users against an existing user database. Returns true if allowed.
OnACLCheck Called when a user attempts to publish or subscribe to a topic filter. As above.
OnSysInfoTick Called when the $SYS topic values are published out.
OnConnect Called when a new client connects, may return an error or packet code to halt the client connection process.
OnSessionEstablish Called immediately after a new client connects and authenticates and immediately before the session is established and CONNACK is sent.
OnSessionEstablished Called when a new client successfully establishes a session (after OnConnect)
OnDisconnect Called when a client is disconnected for any reason.
OnAuthPacket Called when an auth packet is received. It is intended to allow developers to create their own mqtt v5 Auth Packet handling mechanisms. Allows packet modification.
OnPacketRead Called when a packet is received from a client. Allows packet modification.
OnPacketEncode Called immediately before a packet is encoded to be sent to a client. Allows packet modification.
OnPacketSent Called when a packet has been sent to a client.
OnPacketProcessed Called when a packet has been received and successfully handled by the broker.
OnSubscribe Called when a client subscribes to one or more filters. Allows packet modification.
OnSubscribed Called when a client successfully subscribes to one or more filters.
OnSelectSubscribers Called when subscribers have been collected for a topic, but before shared subscription subscribers have been selected. Allows receipient modification.
OnUnsubscribe Called when a client unsubscribes from one or more filters. Allows packet modification.
OnUnsubscribed Called when a client successfully unsubscribes from one or more filters.
OnPublish Called when a client publishes a message. Allows packet modification.
OnPublished Called when a client has published a message to subscribers.
OnPublishDropped Called when a message to a client is dropped before delivery, such as if the client is taking too long to respond.
OnRetainMessage Called then a published message is retained.
OnRetainPublished Called then a retained message is published to a client.
OnQosPublish Called when a publish packet with Qos >= 1 is issued to a subscriber.
OnQosComplete Called when the Qos flow for a message has been completed.
OnQosDropped Called when an inflight message expires before completion.
OnPacketIDExhausted Called when a client runs out of unused packet ids to assign.
OnWill Called when a client disconnects and intends to issue a will message. Allows packet modification.
OnWillSent Called when an LWT message has been issued from a disconnecting client.
OnClientExpired Called when a client session has expired and should be deleted.
OnRetainedExpired Called when a retained message has expired and should be deleted.
StoredClients Returns clients, eg. from a persistent store.
StoredSubscriptions Returns client subscriptions, eg. from a persistent store.
StoredInflightMessages Returns inflight messages, eg. from a persistent store.
StoredRetainedMessages Returns retained messages, eg. from a persistent store.
StoredSysInfo Returns stored system info values, eg. from a persistent store.

If you are building a persistent storage hook, see the existing persistent hooks for inspiration and patterns. If you are building an auth hook, you will need OnACLCheck and OnConnectAuthenticate.

Inline Client (v2.4.0+)

It's now possible to subscribe and publish to topics directly from the embedding code, by using the inline client feature. The Inline Client is an embedded client which operates as part of the server, and can be enabled in the server options:

server := mqtt.New(&mqtt.Options{
  InlineClient: true,
})

Once enabled, you will be able to use the server.Publish, server.Subscribe, and server.Unsubscribe methods to issue and received messages from broker-adjacent code.

See direct examples for real-life usage examples.

Inline Publish

To publish basic message to a topic from within the embedding application, you can use the server.Publish(topic string, payload []byte, retain bool, qos byte) error method.

err := server.Publish("direct/publish", []byte("packet scheduled message"), false, 0)

The Qos byte in this case is only used to set the upper qos limit available for subscribers, as per MQTT v5 spec.

Inline Subscribe

To subscribe to a topic filter from within the embedding application, you can use the server.Subscribe(filter string, subscriptionId int, handler InlineSubFn) error method with a callback function. Note that only QoS 0 is supported for inline subscriptions. If you wish to have multiple callbacks for the same filter, you can use the MQTTv5 subscriptionId property to differentiate.

callbackFn := func(cl *mqtt.Client, sub packets.Subscription, pk packets.Packet) {
    server.Log.Info("inline client received message from subscription", "client", cl.ID, "subscriptionId", sub.Identifier, "topic", pk.TopicName, "payload", string(pk.Payload))
}
server.Subscribe("direct/#", 1, callbackFn)

Inline Unsubscribe

You may wish to unsubscribe if you have subscribed to a filter using the inline client. You can do this easily with the server.Unsubscribe(filter string, subscriptionId int) error method:

server.Unsubscribe("direct/#", 1)

Packet Injection

If you want more control, or want to set specific MQTT v5 properties and other values you can create your own publish packets from a client of your choice. This method allows you to inject MQTT packets (no just publish) directly into the runtime as though they had been received by a specific client.

Packet injection can be used for any MQTT packet, including ping requests, subscriptions, etc. And because the Clients structs and methods are now exported, you can even inject packets on behalf of a connected client (if you have a very custom requirements).

Most of the time you'll want to use the Inline Client described above, as it has unique privileges: it bypasses all ACL and topic validation checks, meaning it can even publish to $SYS topics. In this case, you can create an inline client from scratch which will behave the same as the built-in inline client.

cl := server.NewClient(nil, "local", "inline", true)
server.InjectPacket(cl, packets.Packet{
  FixedHeader: packets.FixedHeader{
    Type: packets.Publish,
  },
  TopicName: "direct/publish",
  Payload: []byte("scheduled message"),
})

MQTT packets still need to be correctly formed, so refer our the test packets catalogue and MQTTv5 Specification for inspiration.

See the hooks example to see this feature in action.

Testing

Unit Tests

Mochi MQTT tests over a thousand scenarios with thoughtfully hand written unit tests to ensure each function does exactly what we expect. You can run the tests using go:

go run --cover ./...

Paho Interoperability Test

You can check the broker against the Paho Interoperability Test by starting the broker using examples/paho/main.go, and then running the mqtt v5 and v3 tests with python3 client_test5.py from the interoperability folder.

Note that there are currently a number of outstanding issues regarding false negatives in the paho suite, and as such, certain compatibility modes are enabled in the paho/main.go example.

Performance Benchmarks

Mochi MQTT performance is comparable with popular brokers such as Mosquitto, EMQX, and others.

Performance benchmarks were tested using MQTT-Stresser on a Apple Macbook Air M2, using cmd/main.go default settings. Taking into account bursts of high and low throughput, the median scores are the most useful. Higher is better.

The values presented in the benchmark are not representative of true messages per second throughput. They rely on an unusual calculation by mqtt-stresser, but are usable as they are consistent across all brokers. Benchmarks are provided as a general performance expectation guideline only. Comparisons are performed using out-of-the-box default configurations.

mqtt-stresser -broker tcp://localhost:1883 -num-clients=2 -num-messages=10000

Broker publish fastest median slowest receive fastest median slowest
Mochi v2.2.10 124,772 125,456 124,614 314,461 313,186 311,910
Mosquitto v2.0.15 155,920 155,919 155,918 185,485 185,097 184,709
EMQX v5.0.11 156,945 156,257 155,568 17,918 17,783 17,649
Rumqtt v0.21.0 112,208 108,480 104,753 135,784 126,446 117,108

mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000

Broker publish fastest median slowest receive fastest median slowest
Mochi v2.2.10 41,825 31,663 23,008 144,058 65,903 37,618
Mosquitto v2.0.15 42,729 38,633 29,879 23,241 19,714 18,806
EMQX v5.0.11 21,553 17,418 14,356 4,257 3,980 3,756
Rumqtt v0.21.0 42,213 23,153 20,814 49,465 36,626 19,283

Million Message Challenge (hit the server with 1 million messages immediately):

mqtt-stresser -broker tcp://localhost:1883 -num-clients=100 -num-messages=10000

Broker publish fastest median slowest receive fastest median slowest
Mochi v2.2.10 13,532 4,425 2,344 52,120 7,274 2,701
Mosquitto v2.0.15 3,826 3,395 3,032 1,200 1,150 1,118
EMQX v5.0.11 4,086 2,432 2,274 434 333 311
Rumqtt v0.21.0 78,972 5,047 3,804 4,286 3,249 2,027

Not sure what's going on with EMQX here, perhaps the docker out-of-the-box settings are not optimal, so take it with a pinch of salt as we know for a fact it's a solid piece of software.

Contribution Guidelines

Contributions and feedback are both welcomed and encouraged! Open an issue to report a bug, ask a question, or make a feature request. If you open a pull request, please try to follow the following guidelines:

  • Try to maintain test coverage where reasonably possible.
  • Clearly state what the PR does and why.
  • Please remember to add your SPDX FileContributor tag to files where you have made a meaningful contribution.

SPDX Annotations are used to clearly indicate the license, copyright, and contributions of each file in a machine-readable format. If you are adding a new file to the repository, please ensure it has the following SPDX header:

// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mochi-mqtt
// SPDX-FileContributor: Your name or alias <[email protected]>

package name

Please ensure to add a new SPDX-FileContributor line for each contributor to the file. Refer to other files for examples. Please remember to do this, your contributions to this project are valuable and appreciated - it's important to receive credit!

Stargazers over time 🥰

Stargazers over time Are you using Mochi MQTT in a project? Let us know!

server's People

Contributors

alexsporn avatar bkupidura avatar boskywsmfn avatar clarkqaq avatar dadebue avatar deadprogram avatar dependabot[bot] avatar dgduncan avatar eitol avatar gsagula avatar helderjnpinto avatar heya-naohiro avatar ianrose14 avatar jeroenrinzema avatar jmacd avatar jphastings avatar kenuestar avatar mochi-co avatar muxxer avatar plourdedominic avatar rkennedy avatar soyoo avatar stffabi avatar thedevop avatar tommyminds avatar werbenhu avatar wind-c avatar x20080406 avatar xyzj avatar zgwit avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

server's Issues

Messages silently dropped

Hi,

First of all, I want to thank you for this great module!

We have embedded this module into our application, we noticed messages received were lower than expected. After eliminating all possibilities, we started investigate deeper into the library, we noticed the BufferSize setting is not only the default size, it's also the absolute limit (wasn't clear from the documentation) for the connection (one for read and one for write). When a single MQTT message is greater than the BufferSize, it will be blocked until the connection timeout, this also does not trigger the on(OnProcess)Message events.

We expect to handle very large number of connections, the vast majority of the message is only couple KB (but very small fraction can be many hundred KB or larger), the message frequency is once a minute. Given module will allocate 2x of the BufferSize (read and write) for each connection, we set a relatively small BufferSize. However, to handle few corner case messages, we would need to dramatically increase the BufferSize, which will substantially increase the total memory requirement.

There are few possible improvements:

  1. Reject message (immediately disconnect client) if size is >BufferSize (maybe rename?) after cl.ReadFixedHeader, trigger an event. Given the current behavior is silently deadlocked in reading and result in connection disconnect due to timeout, this has the same result but fail faster and event callback to upper layer for visibility.
  2. For long lived connections where message frequency is even once a minute, allocate/gc buffer as needed can be amortized, the benefit of using sync.Pool is diminished. The much smaller memory footprint for large concurrent connections can outweight the reduction in memory allocation/gc.
  3. Pre-allocates BufferSize, but dynamically allocates (or expand) buffer as needed.

Topic prefix based on host name?

Is there some way I could configure something so that I can set a root topic based on the host name that devices are connecting to the broker with?

What I mean is my server is available with a wildcard domain - *.myserver.com. Clients can then connect with eg foo.myserver.com or bar.myserver.com and publish / subscribe to topics, so is there some way if a client connects to foo.myserver.com and publishes to topic 'someTopic' that the server would make the actual topic "foo/someTopic"?

Stale inflight messages

I start observing increased number of inflight messages which are almost never going down. In same time i didnt notice any disconnects from clients - so looks like clients just didnt received message and they will never send ACK (network issue on client side?)

I see that there is ResendClientInflight (https://github.com/mochi-co/mqtt/blob/master/server/server.go#L878), but looks like its executed only on new client connection (send messages from persistence store to reconnecting clients?).

Is there any internal mechanism which will try to deliver inflight messages or mochi-co/mqtt user should implement this on his own?

Performance drop versus v1

Hi Mochi,

I love the new features of v2, but i see a significant performance drop.

could you explain the reasons ?

mqtt-stresser -broker tcp://192.168.11.52:1883 -num-clients=10 -num-messages=10000

Receiving Througput (v1)

Median: 45949 msg/sec
Median: 43708 msg/sec
Median: 37352 msg/sec
Median: 45628 msg/sec

Receiving Througput (v2)

Median: 23287 msg/sec
Median: 23744 msg/sec
Median: 27340 msg/sec
Median: 23394 msg/sec

[Missing package issue] C-based client gets disconnected

Folks, we have a simple unit test on a cross-stacked client written in C (the most prominent one in ICT industry) please help check below logs and attachment
Ref:
https://github.com/emqx/nanomq
nanomq client built from source

D:\Demos\mqtt\examples\events>go run main.go
Mochi MQTT Server initializing... TCP
  Started!  
<< OnConnect client connected nanomq-d28f8d41: {FixedHeader:{Remaining:27 Type:1 Qos:0 Dup:false Retain:false} AllowClients:[] Topics:[] ReturnCodes:[] ProtocolName:[77 81 84 84] Qoss:[] Payload:[] Username:[] Password:[] WillMessage:[] ClientIdentifier:nanomq-d28f8d41 TopicName: WillTopic: PacketID:0 Keepalive:60 ReturnCode:0 ProtocolVersion:4 WillQos:0 ReservedBit:0 CleanSession:true WillFlag:false WillRetain:false UsernameFlag:false PasswordFlag:false SessionPresent:false}
<< OnDisconnect client dicconnected nanomq-d28f8d41: missing packet id
D:\Demos\gomqtt\nanomq\build\nanomq>nanomq.exe sub start -t "direct/publish"
connect_cb: connected!
disconnected
D:\Demos\mqtt\examples\events>go env
set GO111MODULE=
set GOARCH=amd64
set GOBIN=D:\Progra~1\Go\bin
set GOCACHE=C:\Users\phineas\AppData\Local\go-build
set GOENV=D:\Roaming\go\env
set GOEXE=.exe
set GOEXPERIMENT=
set GOFLAGS=
set GOHOSTARCH=amd64
set GOHOSTOS=windows
set GOINSECURE=
set GOMODCACHE=D:\Progra~1\Go\goget\pkg\mod
set GONOPROXY=
set GONOSUMDB=
set GOOS=windows
set GOPATH=D:\Progra~1\Go\goget
set GOPRIVATE=
set GOPROXY=https://proxy.golang.org,direct
set GOROOT=D:\Progra~1\Go
set GOSUMDB=sum.golang.org
set GOTMPDIR=
set GOTOOLDIR=D:\Progra~1\Go\pkg\tool\windows_amd64
set GOVCS=
set GOVERSION=go1.18
set GCCGO=gccgo
set GOAMD64=v1
set AR=ar
set CC=gcc
set CXX=g++
set CGO_ENABLED=1
set GOMOD=D:\Demos\gomqtt\MQTT.js\mqtt\go.mod
set GOWORK=
set CGO_CFLAGS=-g -O2
set CGO_CPPFLAGS=
set CGO_CXXFLAGS=-g -O2
set CGO_FFLAGS=-g -O2
set CGO_LDFLAGS=-g -O2
set PKG_CONFIG=pkg-config
set GOGCCFLAGS=-m64 -mthreads -fmessage-length=0 -fdebug-prefix-map=C:\Users\phineas\AppData\Local\Temp\go-build1490508260=/tmp/go-build -gno-record-gcc-switches

Lock values are being copied in buffer and clients

go vet reports that lock values are being copied in clients.Inflight and circ.Buffer. Additionally, the signature for the buffer WriteTo is incorrect.

# github.com/mochi-co/mqtt/server
server/server.go:224:18: assignment copies lock value to cl.Inflight: github.com/mochi-co/mqtt/server/internal/clients.Inflight
# github.com/mochi-co/mqtt/server/internal/circ
server/internal/circ/buffer.go:77:9: return copies lock value: github.com/mochi-co/mqtt/server/internal/circ.Buffer contains sync.RWMutex
server/internal/circ/reader.go:18:3: literal copies lock value from b: github.com/mochi-co/mqtt/server/internal/circ.Buffer contains sync.RWMutex
server/internal/circ/reader.go:28:3: literal copies lock value from b: github.com/mochi-co/mqtt/server/internal/circ.Buffer contains sync.RWMutex
server/internal/circ/writer.go:19:3: literal copies lock value from b: github.com/mochi-co/mqtt/server/internal/circ.Buffer contains sync.RWMutex
server/internal/circ/writer.go:29:3: literal copies lock value from b: github.com/mochi-co/mqtt/server/internal/circ.Buffer contains sync.RWMutex
server/internal/circ/pool_test.go:11:20: call of require.NotNil copies lock value: sync.Pool contains sync.noCopy
server/internal/circ/writer.go:34:18: method WriteTo(w io.Writer) (total int, err error) should have signature WriteTo(io.Writer) (int64, error)

The above issues should be corrected.

Cluster MQTT broker based on mochi-co/mqtt

I was looking for cluster ready mqtt broker for my home environment (k8s) for some time already. But unfortunately all i checked (emqx, vernemq) have some issues (e.g not compatible with Paho, or broken in home environment).

So based on mochi-co/mqtt i build my own broker - i was heavily inspired by #45

Maybe other people looking for clustering capabilities will be interested, or even better some pieces can be used inside mochi-co/mqtt!

If you think that this is not a place for this kind of communication, please close :)

https://github.com/bkupidura/broker-ha

Publish with retain doesn't retain

Publish works fine except for honoring the retain flag. Broker works fine with external publishing with retain.
Using master branch: 460f0ef

package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	mqtt "github.com/mochi-co/mqtt/server"
	"github.com/mochi-co/mqtt/server/listeners"
	"github.com/mochi-co/mqtt/server/listeners/auth"
)

func main() {
	sigs := make(chan os.Signal, 1)
	done := make(chan bool, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigs
		done <- true
	}()

	server := mqtt.New()
	tcp := listeners.NewTCP("t1", ":1883")
	server.AddListener(tcp, &listeners.Config{Auth: new(auth.Allow)})

	// server.Serve doesn't block, so no need for goroutine, but for some
	// reason, we have to delay a LONG time for Serve's goroutines to start-up
	// > 1 second on a 5900X !!
	server.Serve()
	fmt.Println("\nServer is running")
	fmt.Println("Sleeping for 2 seconds")
	time.Sleep(2 * time.Second)

	// Publish with retain - doesn't work
	server.Publish("direct/publish", []byte("Published from broker"), true)
	fmt.Println("\nMessage published to \"direct/publish\" with retain")

	fmt.Println("\nTry running: mosquitto_sub -v -t \"direct/publish\"")

	<-done
}
go run main.go

# In another terminal window

# The following shows nothing
mosquitto_sub -v -t "direct/publish"

# But this works:
mosquitto_pub -t "mosquitto/publish" -m "from mosquitto_pub" -r
mosquitto_sub -v -t "mosquitto/publish"

Am I missing something?

Provide a means of changing client username

I have come across a situation where it is necessary to change the username used to authenticate the client. I am implementing a system where a user logs in using either (1) username and password (2) webtoken obtained from authentication done by the client itself. Since the username user and the webtoken user are one and the same, I require to change the client username to userId which is the same regardless of how the client authenticates.

I have so far managed to do it this way:

  1. Changing the auth interface
    Authenticate(user, password []byte) (interface{}, error)

  2. Obtaining new username and assigning to client during authentication


	// if !ac.Authenticate(pk.Username, pk.Password) {
	// 	if err := s.ackConnection(cl, packets.CodeConnectBadAuthValues, false); err != nil {
	// 		return s.onError(cl.Info(), fmt.Errorf("invalid connection send ack: %w", err))
	// 	}
	// 	return s.onError(cl.Info(), ErrConnectionFailed)
	// }
	username, err := ac.Authenticate(pk.Username, pk.Password)
	if err != nil {
		if err := s.ackConnection(cl, packets.CodeConnectBadAuthValues, false); err != nil {
			return s.onError(cl.Info(), fmt.Errorf("invalid connection send ack: %w", err))
		}
		return s.onError(cl.Info(), ErrConnectionFailed)
	}
	if username != nil {
		pk.Username = []byte(username.(string))
		cl.Identify(lid, pk, ac)
	}

Is this a feature you could consider?

The OnConnect and the OnDisconnect event hook trigger order is wrong when using the same clientid

my implement code

        server.Events.OnConnect = func(cl events.Client, pk events.Packet) {
		fmt.Printf("<< OnConnect client connected clientid=%s\n", cl.ID)
	}

	server.Events.OnDisconnect = func(cl events.Client, err error) {
		fmt.Printf("<< OnDisconnect client disconnected clientid=%s, err=%v\n", cl.ID, username, err)
	}

client 1 connects to the server, it prints

<< OnConnect client connected clientid=mqttx_f988b2da

client 2 use the same clientid as client 1 connets to the server, it prints

<< OnConnect client connected clientid=mqttx_f988b2da
<< OnDisconnect client disconnected clientid=mqttx_f988b2da err=client session re-established

i think when client 2 use the same clientid as client 1 connets to the server, it should print like

<< OnDisconnect client disconnected clientid=mqttx_f988b2da err=client session re-established
<< OnConnect client connected clientid=mqttx_f988b2da

it should be trigger the OnDisconnect before the OnConnect

Unable to publish messages with larger payload sizes

I was testing out the 1.2.0 branch of this broker, and discovered that it appears to fail silently when i try to publish messages with a larger-sized (>100k) binary payload.

Any thoughts? Thanks in advance.

Some tests fail due to test timing issues when using -count=100 / 1000

As per @rkennedy's comments in #24, various tests fail intermittently and typically exhibit when running go test ./... -count=100. Backtesting indicates these failures occur at least as early as v1.0.5. The failures can occur as few as 1 in every 500 runs. This is likely to be a problem with the tests rather than a problem with the broker code.

Determine the precise cause for these intermittent failures and correct it.

After running go test ./... -count=100 several times, the known failures are as follows:

writing err io: read/write on closed pipe
--- FAIL: TestServerResendClientInflightBackoff (0.01s)
    server_test.go:2096: 
                Error Trace:    server_test.go:2096
                Error:          Not equal: 
                                expected: []byte{0x3a, 0xe, 0x0, 0x5, 0x61, 0x2f, 0x62, 0x2f, 0x63, 0x0, 0xb, 0x68, 0x65, 0x6c, 0x6c, 0x6f}
                                actual  : []byte{}
                            
                                Diff:
                                --- Expected
                                +++ Actual
                                @@ -1,3 +1,2 @@
                                -([]uint8) (len=16) {
                                - 00000000  3a 0e 00 05 61 2f 62 2f  63 00 0b 68 65 6c 6c 6f  |:...a/b/c..hello|
                                +([]uint8) {
                                 }
                Test:           TestServerResendClientInflightBackoff
writing err io: read/write on closed pipe
--- FAIL: TestServerResendClientInflightBackoff (0.00s)
    server_test.go:2105: 
                Error Trace:    server_test.go:2105
                Error:          Not equal: 
                                expected: 1
                                actual  : 2
                Test:           TestServerResendClientInflightBackoff
2022/01/30 10:16:33 error writing to buffer io.Writer; io: read/write on closed pipe
--- FAIL: TestServerWriteClient (0.01s)
    server_test.go:671: 
                Error Trace:    server_test.go:671
                Error:          Not equal: 
                                expected: []byte{0x70, 0x2, 0x0, 0xe}
                                actual  : []byte{}
                            
                                Diff:
                                --- Expected
                                +++ Actual
                                @@ -1,3 +1,2 @@
                                -([]uint8) (len=4) {
                                - 00000000  70 02 00 0e                                       |p...|
                                +([]uint8) {
                                 }
                Test:           TestServerWriteClient
2022/01/30 10:19:33 error writing to buffer io.Writer; io: read/write on closed pipe
--- FAIL: TestServerCloseClientLWT (0.01s)
    server_test.go:1756: 
                Error Trace:    server_test.go:1756
                Error:          Not equal: 
                                expected: []byte{0x30, 0xc, 0x0, 0x5, 0x61, 0x2f, 0x62, 0x2f, 0x63, 0x68, 0x65, 0x6c, 0x6c, 0x6f}
                                actual  : []byte{}
                            
                                Diff:
                                --- Expected
                                +++ Actual
                                @@ -1,3 +1,2 @@
                                -([]uint8) (len=14) {
                                - 00000000  30 0c 00 05 61 2f 62 2f  63 68 65 6c 6c 6f        |0...a/b/chello|
                                +([]uint8) {
                                 }
                Test:           TestServerCloseClientLWT

Please add any others to this issue if you see them.

Howto create anonymous listener for testing

For testing I want to use an anonymous server port like in httptest:

tcp := listeners.NewTCP("t1", "")

Doing so works, but I habe no option to retrieve the port as the TCP listener is private.

Alternatively, it would be nice if the listener could be created and passed into the server on the consumer side.

Malformed packet errors on websocket listener

Hi,

When publishing packages to the websocket listener using the Paho MQTT client library (both 3.11 and 5), I am getting many malformed packet errors. If I switch from ws to tcp listener, I am not getting these errors with the exact same client code.

Screenshot 2022-12-13 at 16 53 11

Any idea what the issue could be here?

Broker can stall when stress testing 40 * 10000 messages

When running ./mqtt-stresser -broker tcp://localhost:1883 -num-clients=40 -num-messages=10000 the broker appears to occasionally stall, although it does not freeze, and it continues to process other messages as expected.

Testing against the latest https://github.com/fhmq/hmq we find only marginal performance difference.

This raises questions about whether there is a benefit to continue using circular buffers (which are difficult to maintain and control) now that the performance of channels has been improved significantly, or if the circular buffers mechanism should be replaced with a worker pool. This would also alleviate issues discussed in #95 and could potentially reduce the overall number of goroutines as mentioned in #80.

Discussion invited :)

SIGSEGV crash when running examples/tcp/main.go

I noticed this issue when i switched from my PC to my raspberry pi, trying to narrow down the cause it seems like i get the same SIGSEGV error when running the tcp example.

Running on Raspberry Pi 3 Model B Plus Rev 1.3, ARMv7 Processor rev 4 (v7l). I've confirmed that there are no issues to bind to the port.

root@housekeeper:/home/ogelami/mochi-mqtt-issue# go run main.go

Mochi MQTT Server initializing... TCP
  Started!
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x1238c]

goroutine 22 [running]:
runtime/internal/atomic.goLoad64(0x1cae03c, 0x0, 0x0)
        /usr/lib/go-1.15/src/runtime/internal/atomic/atomic_arm.go:131 +0x1c
github.com/mochi-co/mqtt/server/listeners.(*TCP).Serve(0x1cae000, 0x1c9a028)
        /root/go/pkg/mod/github.com/mochi-co/[email protected]/server/listeners/tcp.go:89 +0x28
github.com/mochi-co/mqtt/server/listeners.(*Listeners).Serve.func1(0x1cac000, 0x29db60, 0x1cae000, 0x1c9a028)
        /root/go/pkg/mod/github.com/mochi-co/[email protected]/server/listeners/listeners.go:94 +0x70
created by github.com/mochi-co/mqtt/server/listeners.(*Listeners).Serve
        /root/go/pkg/mod/github.com/mochi-co/[email protected]/server/listeners/listeners.go:91 +0x9c
exit status 2

Add Client Username to Event Client Info

The events Client struct should expose the client's username to the event hook receiver functions.

Discussed in #77

Originally posted by youmisun June 11, 2022
The client information of ondisconnet is too limited, I hope to include username information。Instead, I think the listener information is not useful to me

Concurrent map access for clients and inflights causes data race

Earlier today, @muXxer identified an issue in which the broker would crash with fatal errors, caused when the primary server routine and clients attempted to read and write from the inflight messages map simultaneously.

This is a new issue as of 1.3.0, and resulted from the changes made to inflight handling - specifically, allowing the server event loop to scan and resend inflights (where previously, inflights would only be engaged during connection establishment or within the client itself).

The collision occurred between three locations:

  1. Map reads at github.com/mochi-co/mqtt/server.(*Server).ResendClientInflight() (mqtt/server/server.go:912)
  2. Map writes at github.com/mochi-co/mqtt/server/internal/clients.(*Inflight).Delete() (server/internal/clients/clients.go:570)
  3. Map writes at github.com/mochi-co/mqtt/server/internal/clients.(*Clients).Add() (server/internal/clients/clients.go:48)

The problem was reproducible by placing the server under heavy load with inovex/mqtt-stresser, using qos=2 for both the publisher and subscriber: mqtt-stresser -broker tcp://localhost:1883 -num-clients=200 -num-messages=100 -subscriber-qos=2 -publisher-qos=2

A patch is immediately forthcoming and will be released as v1.3.2

Slow clients slow down the whole broker

We are using the MQTT broker and publishing messages directly to all clients using the broker's Publish() func.
This func adds a new publish packet to the inlineMessages.pub buffered channel (size 1024) and the inlineClient() loop will publish those packets to all subscribed clients.
For each subscribed client this will call client.WritePacket() which in the end will call Write() on the clients writer.

If a single subscribed client is too slow, the clients write buffer will fill up and the whole inlineClient() loop will hang until this client's buffer has space again (see awaitEmpty inside Write()). Shortly after the inlineMessages.pub buffered channel will fill up and further calls to Publish() will hang.

This means a single slow client (even one using QoS 0 with no guarantees of receiving packets) can make the whole broker wait indefinitely and not deliver any more packets to any client.

A possible workaround for this could be to instead of waiting for the buffer to be freed, to just return a "client buffer full" error and skip sending the packet to this client. If the client is using QoS 1/2 the inflight message retry mechanism should try to re-deliver the message.

What do you think? I can write a PR with this changes. Or do you have a better solution to this problem?

Messages with retain only persist once

While testing #42 as a fix for #37, I found an unrelated issue with message retention and persistence. Using only mosquitto_pub/sub for testing, the first time you publish with retain, it will save to DB. (server.go: retainMessage q=1)

The second time you publish with the same topic, server.go, retainMessage is q=0 and doesn't save. Therefore, it must not be saving what it originally reads in. Not sure if this applies to other persistent data. Example:

package main

import (
	"bufio"
	"flag"
	"fmt"
	"log"
	"os"
	"time"

	mqtt "github.com/mochi-co/mqtt/server"
	"github.com/mochi-co/mqtt/server/listeners"
	"github.com/mochi-co/mqtt/server/listeners/auth"
	"github.com/mochi-co/mqtt/server/persistence/bolt"
	"go.etcd.io/bbolt"
)

func main() {

	runNumber := flag.Int("run", 1, "number of times we've run")
	flag.Parse()

	server := mqtt.New()
	tcp := listeners.NewTCP("t1", ":1883")
	server.AddListener(tcp, &listeners.Config{Auth: new(auth.Allow)})

	err := server.AddStore(bolt.New("mochi-test.db", &bbolt.Options{
		Timeout: 500 * time.Millisecond,
	}))
	if err != nil {
		log.Fatal(err)

	}

	server.Serve()
	fmt.Printf("Server is running\n\n")

	if *runNumber == 1 {
		fmt.Printf("Send a message to broker with retain flag:\n")
		fmt.Printf("\nmosquitto_pub -r -t \"test/persist\" -m \"I Persist\" \n")
		fmt.Printf("\nPress ENTER to exit, then re-run with -run=2 flag")
		bufio.NewReader(os.Stdin).ReadBytes('\n')
		os.Exit(0)
	} else if *runNumber == 2 {
		fmt.Printf("This is the second run and message has been loaded from DB\n")
		fmt.Printf("Test that you can retrieve the message by running\n")
		fmt.Printf("\nmosquitto_sub -C 1 -t \"test/persist\"\n")

		fmt.Printf("\nThen press ENTER after you've confirmed you received the message\n")
		bufio.NewReader(os.Stdin).ReadBytes('\n')

		// Second run, (s *Server) retainMessage will not save in DB
		fmt.Printf("\nPublish the same message again with:\n")
		fmt.Printf("\nmosquitto_pub -r -t \"test/persist\" -m \"I persist\" \n\n")

		fmt.Printf("When complete, hit ENTER to exit and re-run with: -run=3 flag\n")
		bufio.NewReader(os.Stdin).ReadBytes('\n')
		os.Exit(0)

	} else {
		fmt.Printf("This is our third run. You should be able to get the message\n")
		fmt.Printf("by subscribing, but it doesn't work\n")
		fmt.Printf("\nmosquitto_sub -C 1 -t \"test/persist\"\n\n")

		fmt.Printf("Press ENTER to exit. If you re-run this test, remove mochi-test.db first\n")
		bufio.NewReader(os.Stdin).ReadBytes('\n')
		os.Exit(1)

	}
}

To test:

go run main.go 
mosquitto_pub -r -t "test/persist" -m "I persist"

go run main.go -run=2
mosquitto_sub -C 1 -t "test/persist"
# then
mosquitto_pub -r -t "test/persist" -m "I persist"

go run main.go -run=3
mosquitto_sub -C 1 -t "test/persist"
# Message was not saved in DB

Docker image

As mentioned in the readme, it'd be great to have a Docker image for this

Do not disconnect existing session for new connection auth failures

In Server.EstablishConnection() there is an authorization test, and even if the test fails it first checks whether there's a session present and disconnects the other, valid session. This could lead to an invalid connection disrupting a valid connection.

It seems unintentional. Presumably MQTT does not require a correct response for sessionPresent when authorization has failed.

Migrate to Github Actions

Currently we're using Travis for build checks. It would be simpler if this was done using GitHub Actions.

Optimise struct fields for better memory alignment

Following #17 we found that various struct fields were not 8bit aligned, which resulted in panics on 32bit builds. Fixing this has highlighted that there are other structs within the code which can be optimised to have better memory alignment.

These should be refactored so that each struct uses the optimal amount of memory for better caching, and that all fields should be 8bit aligned for improved 32bit compatibility.

panic: runtime error: invalid memory address or nil pointer dereference

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x1287e94]

goroutine 52708 [running]:
github.com/mochi-co/mqtt/server/internal/clients.(*Client).WritePacket(0xc000e63440, 0x0, 0x103, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/app/vendor/github.com/mochi-co/mqtt/server/internal/clients/clients.go:446 +0x94
github.com/mochi-co/mqtt/server.(*Server).writeClient(0xc000652460, 0xc000e63440, 0x0, 0x103, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/app/vendor/github.com/mochi-co/mqtt/server/server.go:413 +0x78
github.com/mochi-co/mqtt/server.(*Server).publishToSubscribers(0xc000652460, 0x5c, 0x10103, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/app/vendor/github.com/mochi-co/mqtt/server/server.go:662 +0x45e
github.com/mochi-co/mqtt/server.(*Server).processPublish(0xc000652460, 0xc000b07560, 0x5c, 0x10103, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/app/vendor/github.com/mochi-co/mqtt/server/server.go:586 +0x258
github.com/mochi-co/mqtt/server.(*Server).processPacket(0xc000652460, 0xc000b07560, 0x5c, 0x10103, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/app/vendor/github.com/mochi-co/mqtt/server/server.go:436 +0x118
github.com/mochi-co/mqtt/server/internal/clients.(*Client).Read(0xc000b07560, 0xc002657848, 0xc001c96970, 0xf)
/app/vendor/github.com/mochi-co/mqtt/server/internal/clients/clients.go:375 +0x1f5
github.com/mochi-co/mqtt/server.(*Server).EstablishConnection(0xc000652460, 0x177de10, 0x2, 0x19857e0, 0xc000645038, 0x1965aa8, 0xc000370550, 0x0, 0x0)
/app/vendor/github.com/mochi-co/mqtt/server/server.go:342 +0x1071
github.com/mochi-co/mqtt/server/listeners.(*TCP).Serve.func1(0xc000370150, 0xc000492000, 0x19857e0, 0xc000645038)
/app/vendor/github.com/mochi-co/mqtt/server/listeners/tcp.go:106 +0x66
created by github.com/mochi-co/mqtt/server/listeners.(*TCP).Serve
/app/vendor/github.com/mochi-co/mqtt/server/listeners/tcp.go:105 +0x95

Internal publish

When embedding this server, can the application doing the embedding publish a message to a topic directly? If not, can that be added?

Show the reason for client disconnects

While debugging an application, I was at some point confounded by what looked to both the client and the server like the other had side had closed the connection. On further investigation, there were two identical clients, one a zombie process, that were both trying to connect, and each time one connects the other is disconnected.

The errors that were printed in OnDisconnect both do not include the remote address field, which revealed the presence of duplicate client IDs, these also happened to look like ordinary client-initiated disconnects, so the server logs would not reveal there to be a session takeover.

Auditing the code, I learned that both an explicit Disconnect packet could trigger disconnection, but so could EOF or a variety of errors in several code paths. The existing Client was well synchronized for this, but the original cause of stopping the connection was being lost. This adds the first cause to stop the connection. In EstablishConnection(), the root cause will be the one passed to OnDisconnect, so the server logs will reveal the root of the problem. (Since this is MQTT 3.x, there is no way for the Disconnect packet to tell the client, so it could place an explanation in its own logs.)

In addition, there would be no way for a user to configure visibility into other errors, which might less obviously be the cause of or associated with disconnection or simply provide warning flags. Proposing to add an OnError handler to capture support logging all errors.

Related to #21, since if there are duplicate clients it will be nice to say where they're coming from.

Event OnMessage stop publish message

Hi,
I just discovered this software and i liked it. Thanks.

One question:

I want to generate new messages to diferent topics based on original message, but i want to not publish the original.

Is posible to delete (not publish) a message ?

panic when stress testing if we got the "error writing to buffer io.Writer" error

mqtt-stresser.exe -broker tcps://localhost:18883 -skip-tls-verification -num-clients 10 -num-messages 80000 -rampup-delay 1s -rampup-size 10 -global-timeout 180s -timeout 20s

2022/11/07 19:12:35 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61022: wsasend: An existing connection was forcibly closed by the remote host.
2022/11/07 19:20:38 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61125: wsasend: An existing connection was forcibly closed by the remote host.
2022/11/07 19:20:38 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61126: wsasend: An existing connection was forcibly closed by the remote host.
2022/11/07 19:20:38 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61128: wsasend: An existing connection was forcibly closed by the remote host.
2022/11/07 19:20:38 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61127: wsasend: An existing connection was forcibly closed by the remote host.
2022/11/07 19:20:38 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61130: wsasend: An existing connection was forcibly closed by the remote host.
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xc0000005 code=0x0 addr=0x0 pc=0x10ff227]

goroutine 150 [running]:
github.com/mochi-co/mqtt/server/internal/circ.(*Writer).Write(0x0, {0xc0048eaa00, 0x4e, 0xf?})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/internal/circ/writer.go:80 +0x27
github.com/mochi-co/mqtt/server/internal/clients.(*Client).WritePacket(, {{0x4c, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, {0x0, ...}, ...})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/internal/clients/clients.go:500 +0x3fd
github.com/mochi-co/mqtt/server.(*Server).writeClient(
, , {{0x0, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/server.go:440 +0x5d
github.com/mochi-co/mqtt/server.(*Server).publishToSubscribers(
, {{0x4c, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, {0x0, ...}, ...})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/server.go:690 +0x634
github.com/mochi-co/mqtt/server.(*Server).processPublish(_, , {{0x4c, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/server.go:613 +0x698
github.com/mochi-co/mqtt/server.(*Server).processPacket(
, _, {{0x4c, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/server.go:463 +0x13b
github.com/mochi-co/mqtt/server/internal/clients.(*Client).Read(0xc0009087e0, 0xc0001d1a58)
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/internal/clients/clients.go:386 +0x19d
github.com/mochi-co/mqtt/server.(*Server).EstablishConnection(0xc0001162c0, {0x11fd4b8, 0x2}, {0x1285e78, 0xc0004c0380}, {0x1283ca8?, 0x147d0d8?})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/server.go:364 +0x1152
github.com/mochi-co/mqtt/server/listeners.(*TCP).Serve.func1()
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/listeners/tcp.go:106 +0x3d
created by github.com/mochi-co/mqtt/server/listeners.(*TCP).Serve
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/listeners/tcp.go:105 +0xea

Add to Authenticate more options

Hi.

Please, add to auth Authenticate interface more options.

I think Authenticate interface can be changed to next:

type AuthRequest struct {
	Remote   string
	ClientID string
	User     []byte
	Password []byte
}

type Controller interface {
	Authenticate(req AuthRequest) bool
...
}

Support TLS Client Certificates?

Hi I'm looking for an embeddable MQTT broker and came across this project and it looks pretty good, but from a short look around i couldn't see that it supports using certificates to authorize mqtt clients, is that the case and if so would it be possible to add that?

It needs a way to set RequireAndVerifyClientCert on the tls.ClientAuthType on the TLS config being used by the server, and then some plugable way to manage the pool of client CA certificates.

Wrong go versioning in v2.0.0

Looks like versioning in v2.0.0 is broken.

% go get -v github.com/mochi-co/mqtt@latest
go: added github.com/mochi-co/mqtt v1.3.2
% go get -v github.com/mochi-co/[email protected]
go: github.com/mochi-co/[email protected]: invalid version: module contains a go.mod file, so module path must match major version ("github.com/mochi-co/mqtt/v2")

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.