quickmsg / smqtt Goto Github PK
View Code? Open in Web Editor NEW开源 MQTT 服务器(基于reactor-netty实现高性能的、可扩展、支持千万级设备接入集群)支持 mqtt 3.1.1、mqtt 5等协议
Home Page: https://www.smqtt.cc
License: Apache License 2.0
开源 MQTT 服务器(基于reactor-netty实现高性能的、可扩展、支持千万级设备接入集群)支持 mqtt 3.1.1、mqtt 5等协议
Home Page: https://www.smqtt.cc
License: Apache License 2.0
cluster集群属性配置:
cluster: # 集群配置
enable: ${CLUSTER_ENABLE:true} # 集群开关
url: ${CLUSTER_URL:192.168.26.123:7771,192.168.26.124:7771,192.168.25.100:7771} # 启动节点
port: ${CLUSTER_PORT:7771} # 端口 7772
node: ${CLUSTER_NODE:node-1} # 集群节点名称 唯一,123:node-2,124:node-3
namespace: ${CLUSTER_NAMESPACE:topband} # 集群空间,需要一致才能通信
集群以docker方式启动(--net host),192.168.25.100为本机节点IP,DEBUG日志可以看到与123、124建立了连接:
DEBUG r.n.resources.NewConnectionProvider - [id:43e77142, L:/192.168.25.100:55297 - R:/192.168.26.123:7771] Connected new channel
DEBUG r.n.resources.NewConnectionProvider - [id:43e77142, L:/192.168.25.100:55297 - R:/192.168.26.123:7771] onStateChange([connected], SimpleConnection{channel=[id: 0x43e77142, L:/192.168.25.100:55297 - R:/192.168.26.123:7771]})
DEBUG r.n.resources.NewConnectionProvider - [id:43e77142, L:/192.168.25.100:55297 - R:/192.168.26.123:7771] onStateChange([configured], ChannelOperations{SimpleConnection{channel=[id: 0x43e77142, L:/192.168.25.100:55297 - R:/192.168.26.123:7771]}})
DEBUG r.n.resources.NewConnectionProvider - [id:46eb97b2, L:/192.168.25.100:55298 - R:/192.168.26.124:7771] Connected new channel
DEBUG r.n.resources.NewConnectionProvider - [id:46eb97b2, L:/192.168.25.100:55298 - R:/192.168.26.124:7771] onStateChange([connected], SimpleConnection{channel=[id: 0x46eb97b2, L:/192.168.25.100:55298 - R:/192.168.26.124:7771]})
DEBUG r.n.resources.NewConnectionProvider - [id:46eb97b2, L:/192.168.25.100:55298 - R:/192.168.26.124:7771] onStateChange([configured], ChannelOperations{SimpleConnection{channel=[id: 0x46eb97b2, L:/192.168.25.100:55298 - R:/192.168.26.124:7771]}})
本地机器ScubeClusterRegistry类的spreadMessage()方法向集群传播消息时,打印cluster.otherMembers():
INFO i.g.q.registry.ScubeClusterRegistry - cluster: [topband:[email protected]:7771]
而124机器上打印cluster.otherMembers():[],为空
请问这是怎么回事?
1.ConnectionCounterInterceptor类根据CONNECT拦截,但ChannelRegistry.counts()执行通常发生在registry()之前,所以得到的count不是最新的。
2.尝试通过CONNACK拦截进行计数,发现CONNACK拦截不成功
12:13:48.153 [business-io-1262] ERROR i.g.q.core.protocol.PublishProtocol - error
java.lang.NullPointerException: null
at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1693)
at io.github.quickmsg.core.spi.DefaultMessageRegistry.saveSessionMessage(DefaultMessageRegistry.java:32)
at io.github.quickmsg.core.protocol.PublishProtocol.filterOfflineSession(PublishProtocol.java:128)
at io.github.quickmsg.core.protocol.PublishProtocol.lambda$send$0(PublishProtocol.java:102)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:178)
at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1707)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at io.github.quickmsg.core.protocol.PublishProtocol.send(PublishProtocol.java:109)
at io.github.quickmsg.core.protocol.PublishProtocol.parseProtocol(PublishProtocol.java:68)
at io.github.quickmsg.common.protocol.Protocol.lambda$doParseProtocol$0(Protocol.java:27)
at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:47)
at reactor.core.publisher.Mono.subscribe(Mono.java:4338)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:126)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
我注意到关联了数据库后(我用的是mysql),发送的消息如果带retain,那么是可以保存到数据库的。我想问一下如果是不带retain的消息,目前的框架内有办法能将数据保存到数据库中么?或者保存到内存通过外部接口获取到这个数据么?
多谢。
支持双向SSL么?
生产环境下,发现运行一段时间后,就有消息无法收到的现象,经排查和脚本测试,在Qos=2的情况下,broker收到3w条消息后,就没办法将PUBREC消息发送出去,从日志上看,应该是没有收到PUBREC消息,其他的MQTT-Broker 正常。
大佬您好,1883mqtt的可以连接发送消息,mqtt over websocket 的有什么客户端可以测试么?网上找的mqtt over websocket连接界面,可以连接其他人的mqtt over websocket,连接123.57.69.210:8999的会失败,请问知道是哪块配置错了吗?或者这块有什么注意事项或者推荐吗
smqtt收到的iot数据在保存到内存/DB前如果想进行一些自定义的业务处理应该怎么弄?比如把iot数据做一些过滤,比如iot数据放在rocketmq或者kafka
Hi, In /smqtt-core,there is a dependency io.netty:netty-common:4.1.66.Final that calls the risk method.
The scope of this CVE affected version is [4.0.0.Final,4.1.77.Final)
After further analysis, in this project, the main Api called is io.netty.util.internal.PlatformDependent: createTempFile(java.lang.String,java.lang.String,java.io.File)Ljava.io.File;
Risk method repair link : GitHub
CVE Bug Invocation Path--
Path Length : 8
CVE Bug Invocation Path :
io.github.quickmsg.core.ssl.AbstractSslHandler: secure(reactor.netty.tcp.SslProvider$SslContextSpec,io.github.quickmsg.common.config.Configuration)V /download/apache-maven-3.6.3/repository_mount/org/casbin/jdbc-adapter/2.1.3/jdbc-adapter-2.1.3.jar
io.netty.handler.ssl.util.SelfSignedCertificate: init()V /download/apache-maven-3.6.3/repository_mount/org/casbin/jdbc-adapter/2.1.3/jdbc-adapter-2.1.3.jar
io.netty.handler.ssl.util.SelfSignedCertificate: init(java.util.Date,java.util.Date,java.lang.String,int)V /download/apache-maven-3.6.3/repository_mount/org/casbin/jdbc-adapter/2.1.3/jdbc-adapter-2.1.3.jar
io.netty.handler.ssl.util.SelfSignedCertificate: init(java.lang.String,java.util.Date,java.util.Date,java.lang.String,int)V /download/apache-maven-3.6.3/repository_mount/org/casbin/jdbc-adapter/2.1.3/jdbc-adapter-2.1.3.jar
io.netty.handler.ssl.util.SelfSignedCertificate: init(java.lang.String,java.security.SecureRandom,int,java.util.Date,java.util.Date,java.lang.String)V /download/apache-maven-3.6.3/repository_mount/org/casbin/jdbc-adapter/2.1.3/jdbc-adapter-2.1.3.jar
io.netty.handler.ssl.util.BouncyCastleSelfSignedCertGenerator: generate(java.lang.String,java.security.KeyPair,java.security.SecureRandom,java.util.Date,java.util.Date,java.lang.String)[Ljava.lang.String; /download/apache-maven-3.6.3/repository_mount/org/casbin/jdbc-adapter/2.1.3/jdbc-adapter-2.1.3.jar
io.netty.handler.ssl.util.SelfSignedCertificate: newSelfSignedCertificate(java.lang.String,java.security.PrivateKey,java.security.cert.X509Certificate)[Ljava.lang.String; /download/apache-maven-3.6.3/repository_mount/org/casbin/jdbc-adapter/2.1.3/jdbc-adapter-2.1.3.jar
io.netty.util.internal.PlatformDependent: createTempFile(java.lang.String,java.lang.String,java.io.File)Ljava.io.File;
Dependency tree--
[INFO] io.github.quickmsg:smqtt-core:jar:1.1.7
[INFO] +- io.github.quickmsg:smqtt-common:jar:1.1.7:compile
[INFO] | +- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:jar:2.12.4:compile
[INFO] | | \- org.yaml:snakeyaml:jar:1.27:compile
[INFO] | +- com.fasterxml.jackson.dataformat:jackson-dataformat-properties:jar:2.12.4:compile
[INFO] | +- io.projectreactor.netty:reactor-netty:jar:1.0.10:compile
[INFO] | | +- io.projectreactor.netty:reactor-netty-core:jar:1.0.10:compile
[INFO] | | | +- io.netty:netty-handler:jar:4.1.66.Final:compile
[INFO] | | | +- io.netty:netty-handler-proxy:jar:4.1.66.Final:compile
[INFO] | | | | \- io.netty:netty-codec-socks:jar:4.1.66.Final:compile
[INFO] | | | +- io.netty:netty-resolver-dns:jar:4.1.66.Final:compile
[INFO] | | | | \- io.netty:netty-codec-dns:jar:4.1.66.Final:compile
[INFO] | | | +- io.netty:netty-resolver-dns-native-macos:jar:osx-x86_64:4.1.66.Final:compile
[INFO] | | | | \- io.netty:netty-transport-native-unix-common:jar:4.1.66.Final:compile
[INFO] | | | +- io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.66.Final:compile
[INFO] | | | \- io.projectreactor:reactor-core:jar:3.4.9:compile
[INFO] | | | \- org.reactivestreams:reactive-streams:jar:1.0.3:compile
[INFO] | | +- io.projectreactor.netty:reactor-netty-http:jar:1.0.10:compile
[INFO] | | | +- io.netty:netty-codec-http:jar:4.1.66.Final:compile
[INFO] | | | \- io.netty:netty-codec-http2:jar:4.1.66.Final:compile
[INFO] | | \- io.projectreactor.netty:reactor-netty-http-brave:jar:1.0.10:runtime
[INFO] | | \- io.zipkin.brave:brave-instrumentation-http:jar:5.13.3:runtime
[INFO] | | \- io.zipkin.brave:brave:jar:5.13.3:runtime
[INFO] | | \- io.zipkin.reporter2:zipkin-reporter-brave:jar:2.16.3:runtime
[INFO] | | \- io.zipkin.reporter2:zipkin-reporter:jar:2.16.3:runtime
[INFO] | | \- io.zipkin.zipkin2:zipkin:jar:2.23.2:runtime
[INFO] | +- io.netty:netty-codec-mqtt:jar:4.1.66.Final:compile
[INFO] | | +- io.netty:netty-common:jar:4.1.66.Final:compile
[INFO] | | +- io.netty:netty-buffer:jar:4.1.66.Final:compile
[INFO] | | +- io.netty:netty-transport:jar:4.1.66.Final:compile
[INFO] | | | \- io.netty:netty-resolver:jar:4.1.66.Final:compile
[INFO] | | \- io.netty:netty-codec:jar:4.1.66.Final:compile
[INFO] | +- org.projectlombok:lombok:jar:1.18.20:compile
[INFO] | +- org.slf4j:slf4j-api:jar:1.7.30:compile
[INFO] | +- ch.qos.logback:logback-core:jar:1.1.11:compile
[INFO] | +- ch.qos.logback:logback-classic:jar:1.1.11:compile
[INFO] | +- com.fasterxml.jackson.core:jackson-core:jar:2.12.4:compile
[INFO] | +- com.fasterxml.jackson.core:jackson-databind:jar:2.11.0:compile
[INFO] | | \- com.fasterxml.jackson.core:jackson-annotations:jar:2.11.0:compile
[INFO] | +- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.11.0:compile
[INFO] | +- com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.11.0:compile
[INFO] | +- io.micrometer:micrometer-core:jar:1.8.0:compile
[INFO] | | +- org.hdrhistogram:HdrHistogram:jar:2.1.12:compile
[INFO] | | \- org.latencyutils:LatencyUtils:jar:2.0.3:runtime
[INFO] | +- com.github.oshi:oshi-core:jar:5.3.6:compile
[INFO] | | +- net.java.dev.jna:jna:jar:5.6.0:compile
[INFO] | | \- net.java.dev.jna:jna-platform:jar:5.6.0:compile
[INFO] | +- io.micrometer:micrometer-registry-prometheus:jar:1.8.0:compile
[INFO] | | \- io.prometheus:simpleclient_common:jar:0.12.0:compile
[INFO] | | \- io.prometheus:simpleclient:jar:0.12.0:compile
[INFO] | | +- io.prometheus:simpleclient_tracer_otel:jar:0.12.0:compile
[INFO] | | | \- io.prometheus:simpleclient_tracer_common:jar:0.12.0:compile
[INFO] | | \- io.prometheus:simpleclient_tracer_otel_agent:jar:0.12.0:compile
[INFO] | +- io.micrometer:micrometer-registry-influx:jar:1.8.0:compile
[INFO] | +- org.casbin:jcasbin:jar:1.22.1:compile
[INFO] | | +- com.googlecode.aviator:aviator:jar:5.3.0-beta2:compile
[INFO] | | +- com.github.seancfoley:ipaddress:jar:4.2.0:compile
[INFO] | | +- commons-io:commons-io:jar:2.7:compile
[INFO] | | +- org.apache.commons:commons-csv:jar:1.8:compile
[INFO] | | \- com.google.code.gson:gson:jar:2.8.9:compile
[INFO] | \- org.casbin:jdbc-adapter:jar:2.1.3:compile
[INFO] | +- com.oracle.database.jdbc:ojdbc6:jar:11.2.0.4:compile
[INFO] | | +- com.oracle.database.jdbc:ucp:jar:11.2.0.4:compile
[INFO] | | +- com.oracle.database.security:oraclepki:jar:11.2.0.4:compile
[INFO] | | +- com.oracle.database.security:osdt_cert:jar:11.2.0.4:compile
[INFO] | | +- com.oracle.database.security:osdt_core:jar:11.2.0.4:compile
[INFO] | | +- com.oracle.database.ha:simplefan:jar:11.2.0.4:compile
[INFO] | | \- com.oracle.database.ha:ons:jar:11.2.0.4:compile
[INFO] | +- org.postgresql:postgresql:jar:42.2.12:compile
[INFO] | +- com.microsoft.sqlserver:mssql-jdbc:jar:8.2.2.jre8:compile
[INFO] | \- dev.failsafe:failsafe:jar:3.0.0:compile
[INFO] +- io.github.quickmsg:smqtt-rule-dsl:jar:1.1.7:compile
[INFO] | \- io.github.quickmsg:smqtt-rule-engine:jar:1.1.7:compile
[INFO] | +- io.github.quickmsg:smqtt-rule-source-kafka:jar:1.1.7:compile
[INFO] | | \- org.apache.kafka:kafka-clients:jar:2.8.0:compile
[INFO] | | +- com.github.luben:zstd-jni:jar:1.4.9-1:compile
[INFO] | | +- org.lz4:lz4-java:jar:1.7.1:compile
[INFO] | | \- org.xerial.snappy:snappy-java:jar:1.1.8.1:compile
[INFO] | +- io.github.quickmsg:smqtt-rule-source-http:jar:1.1.7:compile
[INFO] | +- io.github.quickmsg:smqtt-rule-source-rocketmq:jar:1.1.7:compile
[INFO] | | \- org.apache.rocketmq:rocketmq-client:jar:4.9.1:compile
[INFO] | | +- org.apache.rocketmq:rocketmq-common:jar:4.9.1:compile
[INFO] | | | +- org.apache.rocketmq:rocketmq-remoting:jar:4.9.1:compile
[INFO] | | | | +- com.alibaba:fastjson:jar:1.2.76:compile
[INFO] | | | | \- org.apache.rocketmq:rocketmq-logging:jar:4.9.1:compile
[INFO] | | | \- commons-validator:commons-validator:jar:1.7:compile
[INFO] | | | +- commons-beanutils:commons-beanutils:jar:1.9.4:compile
[INFO] | | | +- commons-digester:commons-digester:jar:2.1:compile
[INFO] | | | \- commons-collections:commons-collections:jar:3.2.2:compile
[INFO] | | +- org.apache.commons:commons-lang3:jar:3.4:compile
[INFO] | | \- commons-codec:commons-codec:jar:1.9:compile
[INFO] | +- io.github.quickmsg:smqtt-rule-source-rabbitmq:jar:1.1.7:compile
[INFO] | | \- com.rabbitmq:amqp-client:jar:3.6.5:compile
[INFO] | +- io.github.quickmsg:smqtt-rule-source-db:jar:1.1.7:compile
[INFO] | | +- org.jooq:jooq:jar:3.14.11:compile
[INFO] | | | \- javax.xml.bind:jaxb-api:jar:2.3.1:compile
[INFO] | | | \- javax.activation:javax.activation-api:jar:1.2.0:compile
[INFO] | | +- org.jooq:jooq-meta:jar:3.14.11:compile
[INFO] | | +- org.jooq:jooq-codegen:jar:3.14.11:compile
[INFO] | | +- com.zaxxer:HikariCP:jar:4.0.3:compile
[INFO] | | \- mysql:mysql-connector-java:jar:5.1.35:compile
[INFO] | +- io.github.quickmsg:smqtt-rule-source-mqtt:jar:1.1.7:compile
[INFO] | | \- com.hivemq:hivemq-mqtt-client:jar:1.2.2:compile
[INFO] | | +- io.reactivex.rxjava2:rxjava:jar:2.2.19:compile
[INFO] | | +- org.jctools:jctools-core:jar:2.1.2:runtime
[INFO] | | +- org.jetbrains:annotations:jar:16.0.3:runtime
[INFO] | | \- com.google.dagger:dagger:jar:2.27:runtime
[INFO] | | \- javax.inject:javax.inject:jar:1:runtime
[INFO] | \- org.apache.commons:commons-jexl3:jar:3.2.1:compile
[INFO] | \- commons-logging:commons-logging:jar:1.2:compile
[INFO] +- io.github.quickmsg:smqtt-metric-influxdb:jar:1.1.7:compile
[INFO] \- io.github.quickmsg:smqtt-metric-prometheus:jar:1.1.7:compile
Suggested solutions:
Update dependency version
Thank you very much.
如题,万分感谢
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.