drmagice / gmqtt Goto Github PK
View Code? Open in Web Editor NEWGmqtt is a flexible, high-performance MQTT broker library that fully implements the MQTT protocol V3.x and V5 in golang
License: MIT License
Gmqtt is a flexible, high-performance MQTT broker library that fully implements the MQTT protocol V3.x and V5 in golang
License: MIT License
Gmqtt should provide a build-in auth plugin.
该应用有打算支持支持client吗?
当要发送一个非零返回码的connack包时,连接会关闭,导致发送不出去。
When exits the broker using ctrl-c
, the broker will exit immediately without calling any Unload
method.
Add plugin code generator to simplify writing plugins.
May have the following style:
gmqtt gen plugin -name awesome-plugin -hook OnConnect,OnConnected -o 'DST/DIR'
Here is how I tried to send a subscription error:
req.Reject(topic, &codes.Error{
Code: codes.NotAuthorized,
ErrorDetails: codes.ErrorDetails{
ReasonString: []byte("subscription failure"),
},
})
This is my mqtt.js file, you can use node [filename].js
to lunch it as a client.
var mqtt = require('mqtt')
var bcrypt = require('bcrypt');
var user = {
username: "steve",
password: "ilovesws"
}
bcrypt.genSalt(10, function (err, salt) {
if (err) {
console.log(err)
};
bcrypt.hash(user.password, salt, function (err, hash) {
if (err) {
console.log(err)
};
user.password = hash;
console.log(hash.length)
});
});
var client = mqtt.connect('mqtt://localhost:18880', {
clientId: "coco",
username: user.username,
password: user.password,
protocolVersion: 5,
resubscribe: false
})
var topic = 'sys/af65764f-54de-464b-aa8e-f9f095f7fa32/327c4a2b-97db-4c78-a2c8-5bf85317f0d1/aps'
client.on('connect', function (connack) {
console.log('connack is :', connack)
client.subscribe(topic, function (err, granted) {
console.log(err, granted)
if (err) {
console.log(err)
}
})
// client.publish("sys/af65764f-54de-464b-aa8e-f9f095f7fa32/327c4a2b-97db-4c78-a2c8-5bf85317f0d1/gps", "hello", { qos: 0, properties: { userProperties: { Username: "steve", Password: "123456" } } })
})
client.on('disconnect', function (packet) {
console.log('断开连接:', packet)
})
client.on('message', function (topic, message) {
// message is Buffer
console.log('收到消息:', message.toString())
client.unsubscribe(topic, function () {
console.log('取消订阅成功')
client.end()
})
})
And I get this subscribe callback message:
null [
{
topic: 'sys/af65764f-54de-464b-aa8e-f9f095f7fa32/327c4a2b-97db-4c78-a2c8-5bf85317f0d1/aps',
qos: 135,
nl: false,
rap: false,
rh: 0,
properties: undefined
}
]
It is from this code snippet's console.log function:
client.subscribe(topic, function (err, granted) {
console.log(err, granted)
if (err) {
console.log(err)
}
})
I expect an error message but mqtt.js client doesn't appear to receive any,what could be wrong? Anyway thank you for this amazing project, it has saved much of my time.
🙇🏻♂️
In Subscribe
function, the clientStats
can be reset when subscriptions belong to different subtree.
Normal topic, shared topic (begin with $share) and system topic (begin with $) have their own subtree.
gmqtt/persistence/subscription/mem/trie_db.go
Lines 285 to 288 in afcf8c8
In Unsubscribe
function, index
is missing for db.sharedTrie
gmqtt/persistence/subscription/mem/trie_db.go
Lines 316 to 330 in afcf8c8
Once available I will provide you with an additional integration test coverage 😁 from a new MQTT client.
If the client forgets to send ack packets (puback,pubrec,pubcomp) for some reason or there are bugs on the client side. The inflight message will not be deleted and the inflight queue may be filled up with those unacked inflight messages, which will cause the server to drop all further messages for the client.
To solve this, we can add expiry time for the inflight message and delete it when expired.
我之前也是测试,用mosquito做了测试,发现没法做集群,只能用桥的方式,不知道你有没有开发集群的计划
If yes, in what kind of environment, with what kind of workload?
I'm interested in embedded this into our project which will run in production with tens of thousands of devices connected in the coming future, so I'd like to have a rough estimation on what I can expect from gmqtt.
Thanks!
When used as a library, the main program cannot get the error, it panic directly.
cpu pprof result:
Showing nodes accounting for 29.98s, 91.93% of 32.61s total
Dropped 119 nodes (cum <= 0.16s)
Showing top 10 nodes out of 42
flat flat% sum% cum cum%
15.41s 47.26% 47.26% 27.24s 83.53% runtime.mapaccess2
4.09s 12.54% 59.80% 4.09s 12.54% runtime.aeshashbody
2.19s 6.72% 66.51% 2.19s 6.72% runtime.memequal16
2.19s 6.72% 73.23% 6.54s 20.06% runtime.memhash
1.88s 5.77% 78.99% 8.42s 25.82% runtime.memhash16
1.87s 5.73% 84.73% 1.88s 5.77% runtime.cgocall
...
Possible causes:
func (s *session) getPacketId() packets.PacketId {
s.pidMu.Lock()
defer s.pidMu.Unlock()
for i := packets.MIN_PACKET_ID; i < packets.MAX_PACKET_ID; i++ {
if _, ok := s.pid[i]; !ok { // <--- runtime.mapaccess2 called too many times which is top 1 in cpu pprof
s.pid[i] = true
return i
}
}
return 0
}
"a/#" can match retained message "b/a"
Gmqtt uses grpc-gateway to provide both gRPC and HTTP endpoint. These two endpoints are now provided by the admin plugin.
However, the latter coming plugin may also want to attach handlers to gRPC or HTTP server, e.g: the auth plugin.
In order to achieve that, the admin plugin provides XXXRegister
interfaces:
Lines 39 to 42 in 5ee07a7
And For those plugins which want to attach handlers, they should
Lines 122 to 127 in 5ee07a7
admin
plugin: gmqtt/cmd/gmqttd/default_config.yml
Lines 73 to 77 in 5ee07a7
Here is the problem, exposing API handler has a dependency on the plugin order.
To solve the problem, we can put gRPC and HTTP server set up logic into gmqtt core, and expose XXXRegister
interfaces via server.Server
, and the plugin which wants to attach handlers will only has dependency on server.Server
.
I want it to be a broker, but I don’t know how to receive messages from the client
Gmqtt are now using paho.mqtt.testing for integration tests.
It is good to have an integration test framework written in go, which is more maintainable and easier for us to add more test cases.
For maximum productivity, it would be very helpful to add build tools such as Makefile
or another complete build system like Bazel
Although build systems such as Bazel massively reduce build time, they are a little bit complicated and their learning curves are steep.
I'm looking forward to making a PR for adding at least a proper Makefile
for ease of use.
Hello, thank you for providing the way to implement mqtt server in go. Now I am testing it at the arm board. When the client connect, it cause the crash. I find that storing int64 using atomic mode at arm32 sometimes will cause "the crash on unaligned uint64". It's the code in the source below, in the file "src/runtime/internal/atomic/atomic_arm.go".
//go:nosplit
func goStore64(addr *uint64, v uint64) {
if uintptr(unsafe.Pointer(addr))&7 != 0 {
*(*int)(nil) = 0 // crash on unaligned uint64
}
_ = *addr // if nil, fault before taking the lock
addrLock(addr).lock()
*addr = v
addrLock(addr).unlock()
}
Do you have some solution to solve it? Thank for very much !
Would it be possible to include an unsubscribe hook to get notified when a client unsubscribes from a topic but does not disconnect?
Hi, i found you project and try to understand what is the best for simple pub/sub model with guarantied message delivery. Now i'm USE nats streaming, how about gmqtt?
And how can i do HA with it?
I am using the latest version which is v0.3.0
Here is the client file I am using to produce this error:
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func main() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
topic1 := "test/topic"
go testPublishEvery10Seconds("sws1", topic1)
// go testPublishEvery10Seconds("sws1", topic2)
<-sigs
}
func testPublishEvery10Seconds(clientID, topic string) {
clientOptions := mqtt.NewClientOptions()
clientOptions.AddBroker("tcp://localhost:18888")
clientOptions.SetClientID(clientID)
clientOptions.SetUsername("admin")
clientOptions.SetPassword("ilovesws")
clientOptions.SetProtocolVersion(4)
clientOptions.SetKeepAlive(30)
clientOptions.SetCleanSession(false)
var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}
clientOptions.SetDefaultPublishHandler(f)
c := mqtt.NewClient(clientOptions)
if token := c.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("Error on Client.Connect(): %v", token.Error())
}
}
I am using the above code to connect to a gmqtt server, at the moment I get a connection I disconnect the Internet(I am using mac and disconnect
means I close the WIFI connection. And it would take so long (sometime it might take half and two minutes) for the server to detect a disconnection!
You can see that I have set KeepAlive
to 30 seconds so this should not happen right?
And by the way, after searching google I found that actually KeepAlive
is for a client to send packets to the server and makes sure the connection doesn't break up. So right now I am kind of confused, actually there is a max_keepalive setting in config.yml
.
And here is my config.yml:
listeners:
- address: ":18888"
websocket:
path: "/"
# - address: "localhost:18889"
# websocket:
# path: "/"
mqtt:
session_expiry: 1m
message_expiry: 1m
max_packet_size: 200
server_receive_maximum: 65535
max_keepalive: 60 # unlimited
topic_alias_maximum: 0 # 0 means not Supported
subscription_identifier_available: true
wildcard_subscription_available: false
shared_subscription_available: true
maximum_qos: 2
retain_available: false
max_queued_messages: 1000
max_inflight: 32
max_awaiting_rel: 100
queue_qos0_messages: true
delivery_mode: overlap # overlap or onlyonce
allow_zero_length_clientid: true
log:
level: debug # debug | info | warning | error
And here is my onBasicAuth hook:
var OnBasicAuth server.OnBasicAuth = func(ctx context.Context, client server.Client, req *server.ConnectRequest) error {
username := string(req.Connect.Username)
password := string(req.Connect.Password)
isvalid := validation(username, password)
if !isvalid {
// check the client version, return a compatible reason code.
switch client.Version() {
case packets.Version5:
return codes.NewError(codes.BadUserNameOrPassword)
case packets.Version311:
return codes.NewError(codes.V3BadUsernameorPassword)
}
}
// return nil if pass authentication.
return nil
}
I expect the server to detect a disconnection in 45 seconds or 60 seconds, not minutes.
I saw an error but this error occurred so late!
ERROR server/client.go:240 connection lost {"client_id": "sws1", "error": "read tcp x.x.x.x:18888->x.x.x.x:16752: read: connection timed out"}
If you go through my issue, I would like to give you a big thank you!🙇🏻♂️
WARNING: DATA RACE
Write at 0x0000012e8538 by goroutine 56:
github.com/DrmagicE/gmqtt/server.(*Server).Run.func1()
/usr/local/gopath/src/github.com/DrmagicE/gmqtt/server/server.go:601 +0x7f
net/http.HandlerFunc.ServeHTTP()
/usr/local/go/src/net/http/server.go:1918 +0x51
net/http.(*ServeMux).ServeHTTP()
/usr/local/go/src/net/http/server.go:2254 +0xa2
net/http.serverHandler.ServeHTTP()
/usr/local/go/src/net/http/server.go:2619 +0xbc
net/http.(*conn).serve()
/usr/local/go/src/net/http/server.go:1801 +0x83b
Previous write at 0x0000012e8538 by goroutine 71:
github.com/DrmagicE/gmqtt/server.(*Server).Run.func1()
/usr/local/gopath/src/github.com/DrmagicE/gmqtt/server/server.go:601 +0x7f
net/http.HandlerFunc.ServeHTTP()
/usr/local/go/src/net/http/server.go:1918 +0x51
net/http.(*ServeMux).ServeHTTP()
/usr/local/go/src/net/http/server.go:2254 +0xa2
net/http.serverHandler.ServeHTTP()
/usr/local/go/src/net/http/server.go:2619 +0xbc
net/http.(*conn).serve()
/usr/local/go/src/net/http/server.go:1801 +0x83b
Goroutine 56 (running) created at:
net/http.(*Server).Serve()
/usr/local/go/src/net/http/server.go:2720 +0x37c
net/http.(*Server).ListenAndServe()
/usr/local/go/src/net/http/server.go:2636 +0xc7
github.com/DrmagicE/gmqtt/server.(*Server).serveWebSocket()
/usr/local/gopath/src/github.com/DrmagicE/gmqtt/server/server.go:541 +0x80
Goroutine 71 (running) created at:
net/http.(*Server).Serve()
/usr/local/go/src/net/http/server.go:2720 +0x37c
net/http.(*Server).ListenAndServe()
/usr/local/go/src/net/http/server.go:2636 +0xc7
github.com/DrmagicE/gmqtt/server.(*Server).serveWebSocket()
/usr/local/gopath/src/github.com/DrmagicE/gmqtt/server/server.go:541 +0x80
WARNING: DATA RACE
Write at 0x00c0421a80f8 by goroutine 15:
github.com/DrmagicE/gmqtt/server.(*Client).redeliver()
D:/yushen/gopath/src/github.com/DrmagicE/gmqtt/server/client.go:620 +0x664
Previous read at 0x00c0421a80f8 by goroutine 12:
github.com/DrmagicE/gmqtt/pkg/packets.(*Publish).Pack()
D:/yushen/gopath/src/github.com/DrmagicE/gmqtt/pkg/packets/publish.go:71 +0xe8
github.com/DrmagicE/gmqtt/pkg/packets.(*Writer).WritePacket()
D:/yushen/gopath/src/github.com/DrmagicE/gmqtt/pkg/packets/packets.go:130 +0x73
github.com/DrmagicE/gmqtt/server.(*Client).writePacket()
D:/yushen/gopath/src/github.com/DrmagicE/gmqtt/server/client.go:189 +0x6d
github.com/DrmagicE/gmqtt/server.(*Client).writeLoop()
D:/yushen/gopath/src/github.com/DrmagicE/gmqtt/server/client.go:178 +0x375
Goroutine 15 (running) created at:
github.com/DrmagicE/gmqtt/server.(*Client).serve()
D:/yushen/gopath/src/github.com/DrmagicE/gmqtt/server/client.go:646 +0x17c
Goroutine 12 (running) created at:
github.com/DrmagicE/gmqtt/server.(*Client).serve()
D:/yushen/gopath/src/github.com/DrmagicE/gmqtt/server/client.go:642 +0xe7
Due to the MQTT protocol architecture, it is difficult or may be impossible to meet all requirements in MQTT specification.
The key point of clustering MQTT brokers is the tradeoffs between consistency and availability.
The basic idea of gmqtt cluster is just like other popular brokers, emq,vernmq,HiveMQ, etc.
It is a masterless cluster, each nodes in cluster are equals. User can choose either server-side or client-side load balance to connect the cluster.
When network partition happens, we have 2 options:
All nodes will continue serving, any message, subscribe, unsubscribe, connect, disconnect event will be delivered once the network partition heals. It is an eventually consistent model. Of course, the message order can not be perserved in this situation.
If the partition do not heal and reach a timeout or the event buffer is full, the cluster should remove the node automatic. It may cause message loss.
All nodes will return negative ack for subscribe, unsubscribe, connect packet. For publish packet:
• If the client protocol is V3, then closes the client because there is no negative ack for publish packet in V3 protocol.
• If the client protocol is V5, then returns a negative ack.
When the partition heals, all nodes can re-join to the cluster automaticly.
If we trade availability for consistency, the cluster can not provide fault tolerance——any node goes down will cause the whole cluster become unavailable.
Gmqtt will make this tradeoff strategy configurable like how vernemq does.
In gmqtt cluster, the session state will not replicate to other nodes. Which means the client can not resume a session from different nodes.
The reason is that it is very difficult and requires a lot of extra works to do to copy session state and maintain the consistency across nodes. This comment emqx/emqx#1623 (comment) elaborates the difficulties.
Due to this complexity, both EMQ and vernemq do not support session migration yet.
TODO
Some reference:
Example:
gmqctl admin get clients xxxx
gmqctl admin subscribe xxxx
gmqctl admin publish xxx
gmqctl auth add account xxxx
....
Kubectl - https://kubernetes.io/docs/tasks/extend-kubectl/kubectl-plugins/
Helm - https://helm.sh/docs/topics/plugins/
Is it possible to modify the message payload before delivering it to the clients?
I have a client that sends zlib compressed data to the MQTT broker, and I need to decompress it before delivering it to the clients which don't have the capability to do it themselves.
I have tried to modify it in OnMsgArrived
hook by:
msg
to the new message I've constructed via gmqtt.NewMessage
arrived(ctx, client, newMessage)
as return valueBoth seem to have no effect at all.
Is there a way to achieve this?
I am currently using v0.3.0
Yes, it does
in my config.yml file I set tls configuration to wrong paths:
listeners:
- address: ":18888"
tls:
cacert: "./wrongpath/rootssCA.pem"
cert: "./wrongpath/server-crt.pem"
key: "./wrongpath/server.pem"
verify: true
I expect to see an error complaining about it
Since there is no error output, everything seems to be fine and kicking about actually the tcp listener doesn't work in tls mode, so it's not so good right?
I will make a new pull request to fix this.😅
It is expected to cancel the iteration by returning false from the IterateFn
while doing iteration.
Line 9 in d986194
Returning false cannot cancel the iteration.
Missing websocket server shutdown function in
func (srv *Server) Stop(ctx context.Context) error {
//...
}
当前的日志格式如下(时间_级别_代码_信息):
2021-04-16T16:31:49.266+0800 INFO server/server.go:1097 init plugin hook wrappers
2021-04-16T16:31:49.266+0800 INFO server/server.go:880 open persistence succeeded {"type": "memory"}
我想更改它的格式,确保它与我的代码所使用的日志格式相同,例如 级别_时间_信息,但是没有在文档中找到定义字段的方法。
万望不吝赐教。谢谢。
srv.willMessage
map here will never been cleaned. The element should be removed after the will message is sent or the session resumes in time.
Lines 512 to 523 in b9f60f0
OS: Linux Arch
OS ARCH: x64
Gmqtt: upstream
I try to use the http api of admin plugin for publish a message by curl tool like this.
$ curl -X POST 127.0.0.1:8083/v1/publish -d '{"topic_name":"a","payload":"test","qos":1}'
{}%
But the topic can not receive the message, and I not use other plugins.
Thank you for your good project!
The OnWillMsgPublish
is invoked before sending the will message, and the OnWillMsgPublished
is invoked after the will message has been sent.
The reason of adding these hooks is the federation
plugin need to know when to deliver will message across the cluster, so it needs to hook OnWillMsgPublish
to get that information.
你好,请问有无计划支持集群。谢谢
Gmqtt是用Go语言实现的一个具备灵活灵活扩展能力,高性能的MQTT broker,其完整实现了MQTT V3.1.1和V5协议。
请问做过压测吗?单台服务器能带多少个MQTT客户端呢?然后集群部署(比如10个节点能带多少台)呢?
Currently, If a message matches multiple shared subscriptions, it will only be sent once, which violates the spec:
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250
Each Shared Subscription is independent from any other. It is possible to have two Shared Subscriptions with overlapping filters. In such cases a message that matches both Shared Subscriptions will be processed separately by both of them
I was testing the library using the Websocket example and I found out that I am unable to connect using the client offered by HiveMQ with the following error:
Connect failed: AMQJS0007E Socket error:undefined.
WebSocket handshake: Sent non-empty 'Sec-WebSocket-Protocol' header but no response was received
How intercept messages from published?
My init code:
package main
import (
"context"
"net"
"os"
"os/signal"
"syscall"
"log"
"github.com/DrmagicE/gmqtt"
)
func main() {
// listener
ln, err := net.Listen("tcp", ":1883")
if err != nil {
log.Fatalln(err.Error())
return
}
s := gmqtt.NewServer(
gmqtt.WithTCPListener(ln),
)
s.Run()
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
<-signalCh
s.Stop(context.Background())
}
Hi
I personally really appreciate this project and its potential to be a powerful MQTT message broker.
As the README mentioned currently there is not support for clustering and it's fine for a young project but I suggest maintainers of the project create a roadmap and guideline for further improvement and the ability to works in the cluster mode. e.g the way that clustering should be implemented and its architecture.
Also, a contribution guild would be very helpful for anyone how likes to contribute and improve this project As I personally love to do to get rid of the current chunky erlang broker cluster that we have.
没有给缺少clientid的客户端生成唯一id
hi,你的邮箱是?
if srv.onConnected != nil {
srv.onConnected(&chainStore{}, client)
}
client.setConnectedAt(time.Now())
if client.server.onClose != nil {
client.server.onClose(&chainStore{}, client, client.err)
}
client.setDisconnectedAt(time.Now())
I am using this lib for a quite long time. However recently i noticed If i sent message through the gmqtt Publish method and if a client is disconnected at that time the sessions gets lost. When i reconnect i didn't receive the message
On the other hand, If i send message to topic via a third party client like paho.mqtt.golang
while the client is disconnected , gmqtt preserves the sessions. Which means when i reconnect i get the published message.
Please correct me if i am wrong. Is this behaviour suppose to happen ? I am guessing since the build in Publish method of gmqtt does not have a client id so it didn't persist any sessions ?
Also i am using a plugin approach to re schedule the message from db with the publish method if the gmqtt server gets restarts. I am using the build in publish method for that.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.