Git Product home page Git Product logo

surgemq's People

Contributors

hades32 avatar muddydixon avatar zhenjl avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

surgemq's Issues

Panic when using example on ARM

I built the "surgemq" example on a Raspberry Pi 2 with Debian Jessie and get this

E0221 22:25:38.464626    3586 service.go:193/func·008] (1/mosqpub/3594-jessie-rpi) Recovering from panic: runtime error: invalid memory address or nil pointer dereference
E0221 22:25:38.468213    3586 sendrecv.go:33/func·003] (1/mosqpub/3594-jessie-rpi) Recovering from panic: runtime error: invalid memory address or nil pointer dereference
E0221 22:25:38.468502    3586 sendrecv.go:74/func·004] (1/mosqpub/3594-jessie-rpi) Recovering from panic: runtime error: invalid memory address or nil pointer dereference
^CE0221 22:25:45.004451    3586 surgemq.go:77/func·001] Existing due to trapped signal; interrupt

when I execute mosquitto_pub -r -m "test by mr" -t 'test/topic'.

I had to change the example although as otherwise I got more errors. My imports look now like this:

        "github.com/surgemq/surgemq/service"
        "github.com/surge/glog"

run pingmq error:flag redefined: log_dir

when i run example/pingmq,
error like this:

./pingmq flag redefined: log_dir
panic: ./pingmq flag redefined: log_dir

goroutine 1 [running]:
flag.(*FlagSet).Var(0xc0000a4120, 0x72f8c0, 0xc000091750, 0x6e7dfd, 0x7, 0x6f3a95, 0x2f)
	/usr/local/go/src/flag/flag.go:805 +0x529
flag.(*FlagSet).StringVar(0xc0000a4120, 0xc000091750, 0x6e7dfd, 0x7, 0x0, 0x0, 0x6f3a95, 0x2f)
	/usr/local/go/src/flag/flag.go:708 +0x8a
flag.(*FlagSet).String(0xc0000a4120, 0x6e7dfd, 0x7, 0x0, 0x0, 0x6f3a95, 0x2f, 0xc000091740)
	/usr/local/go/src/flag/flag.go:721 +0x8b
flag.String(0x6e7dfd, 0x7, 0x0, 0x0, 0x6f3a95, 0x2f, 0xc000091720)
	/usr/local/go/src/flag/flag.go:728 +0x69

Message loss, Please help!!! Thanks

I use surgemq as a MQTT broker. When I send messages with two publisher at the same time and a subscriber with qos 2. The subscriber loss some message.I try to find out why? It seems to has bug in ackqueue.go. Beacuse when I use qos 0 or 1 ,not message loss. Please help!

publish before serving?

Q: Is it possible to publish topics to the server before it starts serving network requests?

This might make it possible to implement persistent retains as a separate process: i.e. log all retained messages to file or some database via a listener subscribed to "#", then on startup have the server go through the saved data and "pre-publish" them as a way to prime the in-memory cache of SurgeMQ.

-jcw

pingtest disconnect failure

Hello,

A while ago my colleagues and I had some fun testing MQTT servers we wrote: http://goo.gl/RorFlv

The tools we use for stressing our implementations are pingtest and loadtest:
go get code.google.com/p/jra-go/mqtt/pingtest
go get code.google.com/p/jra-go/mqtt/loadtest

pingtest currently fails with surgemq, because something bad happens during disconnect. You can even see it with "pingtest -pairs=1"

I have not investigated yet if this is from my client, but because we have used it with many many server implementations, it is more likely a problem with surgemq.

The QosFailure will never return

The QosFailure will never return to client

The Subscribe function in memtopics.go message.QosFailure always whith some err.
https://github.com/influxdata/surgemq/blob/master/topics/memtopics.go#L61

func (this *memTopics) Subscribe(topic []byte, qos byte, sub interface{}) (byte, error) {
	if !message.ValidQos(qos) {
		return message.QosFailure, fmt.Errorf("Invalid QoS %d", qos)
	}

	if sub == nil {
		return message.QosFailure, fmt.Errorf("Subscriber cannot be nil")
	}

	this.smu.Lock()
	defer this.smu.Unlock()

	if qos > MaxQosAllowed {
		qos = MaxQosAllowed
	}

	if err := this.sroot.sinsert(topic, qos, sub); err != nil {
		return message.QosFailure, err
	}

	return qos, nil
}

But the processSubscribe function in process.go ,when Subscribe return err QosFailure will not append into retcodes but function return

https://github.com/influxdata/surgemq/blob/master/service/process.go#L315

for i, t := range topics {
	rqos, err := this.topicsMgr.Subscribe(t, qos[i], &this.onpub)
	if err != nil {
		return err
	}
	this.sess.AddTopic(string(t), qos[i])

	retcodes = append(retcodes, rqos)

	// yeah I am not checking errors here. If there's an error we don't want the
	// subscription to stop, just let it go.
	this.topicsMgr.Retained(t, &this.rmsgs)
	glog.Debugf("(%s) topic = %s, retained count = %d", this.cid(), string(t), len(this.rmsgs))
}

Client.Ping method raises network disconnection: "use of closed network connection"

STEPS TO REPRODUCE

$ cd $GOPATH/src/github.com/surgemq/surgemq/examples/surgemq
$ go run *.go

then, go run this code with -logtostderr -vv 3:

package main

import (
    "flag"
    "log"

    "github.com/surgemq/message"
    "github.com/surgemq/surgemq/service"
)

func main() {
    flag.Parse()
    c := &service.Client{}
    cmsg := message.NewConnectMessage()
    cmsg.SetVersion(4)
    cmsg.SetCleanSession(true)
    cmsg.SetKeepAlive(10)
    if err := c.Connect("tcp://localhost:1883", cmsg); err != nil {
        log.Fatalln("Connect:", err)
    }

    fn := func(msg, ack message.Message, err error) error {
        log.Println("Pong:", err)
        return err
    }
    done := make(chan struct{})
    if err := c.Ping(service.OnCompleteFunc(fn)); err != nil {
        log.Fatalln("Ping:", err)
        close(done)
    }
    <-done
}

RESULTS

I1204 23:16:44.348778   13936 sendrecv.go:103/sender] (1/) Starting sender
I1204 23:16:44.348781   13936 process.go:46/processor] (1/) Starting processor
I1204 23:16:44.348883   13936 sendrecv.go:58/receiver] (1/) Starting receiver
I1204 23:16:44.349183   13936 service.go:210/stop] (1/) closing this.conn
I1204 23:16:44.349223   13936 sendrecv.go:100/func1] (1/) Stopping sender
E1204 23:16:44.349252   13936 sendrecv.go:76/receiver] (1/) error reading from connection: read tcp [::1]:61472->[::1]:1883: use of closed network connection
I1204 23:16:44.349364   13936 sendrecv.go:55/func1] (1/) Stopping receiver
I1204 23:16:44.349375   13936 service.go:220/stop] (1/) Received 16 bytes in 2 messages.
I1204 23:16:44.349383   13936 service.go:221/stop] (1/) Sent 6 bytes in 2 messages.

REASONS

In case of Client.Ping(), ackmsg.Msgbuf is empty in service.processAcked().
As a result, msg.Decode(ackmsg.Msgbuf) panics with "slice bounds out of range".

ack timeout

When ack cycle completes, call OnComplete function.How to deal with ack timeout?

ping error

Ping will break when it is,at <header.go line 197>,
I added a length judgment,if len(src)==0{return total,nil}

ps:Please ignore the Google translation level.

ReadPeek could not be waked up

ive got some problem here.
sometimes ReadPeek could not be waked up after buffer.WriteCommit call
when i run 25000 client.

could the ccond.wait be not notified sometime?

Client id length validation

Hi,

I was wondering why the length check here limits the client id to max 23 bytes.

The 3.1.1 spec says you must allow client id's with a length between 1 and 23, but it doesn't say you can't have client id's longer than 23 bytes right? Or am I misunderstanding?

can you tell me something about the bridge

I investigation and research the bridge of emqttd, there is not enough details for me, and I see you will support the bridge in future, so I want to know details about the bridge, is it mqtt client and proxy real client?

race-conditions in service.go and session.go

every operation of *Session.Cmsg in session.go is like this

this.mu.Lock()
defer this.mu.Unlock()
blabla~~~

but in service.go line 237-241

// Publish will message if WillFlag is set. Server side only.
if !this.client && this.sess.Cmsg.WillFlag() {
glog.Infof("(%s) service/stop: connection unexpectedly closed. Sending Will.", this.cid())
this.onPublish(this.sess.Will)
}

there is no lock when operating this.sess.Cmsg

My opinion is:

  1. make Session.Cmsg private
  2. adding the lines below to session.go, and using the functions instead of this.sess.Cmsg
  3. if possible, change this.mu from sync.Mutex to sync.RWMutex

func (this *Session) WillFlag() bool {
this.mu.Lock()
defer this.mu.Unlock()
return this.cmsg.WillFlag()
}

func (this *Session) SetWillFlag(v bool) {
this.mu.Lock()
defer this.mu.Unlock()
this.cmsg.SetWillFlag(v)
}

func (this *Session) CleanSession() bool {
this.mu.Lock()
defer this.mu.Unlock()
return this.cmsg.CleanSession()
}

Another place of this problem in memtopics.go line 128-130

func (this *memTopics) Close() error {
this.sroot = nil
this.rroot = nil
return nil
}

shoud change to

func (this *memTopics) Close() error {
this.smu.Lock()
this.sroot = nil
this.smu.Unlock()
this.rmu.Lock()
this.rroot = nil
this.rmu.Unlock()
}

I'll Pull Request later when I complete my coding

Client panics "topics: Register called twice for provider "

I'm writting a MQTT client using surgemq package.

service.Client panics with "topics: Register called twice for provider " message.

STEPS TO REPRODUCE

$ cd $GOPATH/src/github.com/surgemq/surgemq/examples/surgemq
$ go run *.go

then, go run:

package main

import (
    "log"

    "github.com/surgemq/message"
    "github.com/surgemq/surgemq/service"
)

func Connect() (*service.Client, error) {
    c := &service.Client{}
    msg := message.NewConnectMessage()
    msg.SetVersion(4)
    msg.SetCleanSession(true)
    msg.SetKeepAlive(10)
    if err := c.Connect("tcp://localhost:1883", msg); err != nil {
        return nil, err
    }
    return c, nil
}

func main() {
    client, err := Connect()
    if err != nil {
        log.Fatal(err)
    }
    client.Disconnect()
    client, err = Connect()
    client.Disconnect()
}

why is there a dependency on websocket?

As the subject says ... I saw in the godoc dependency graph that SurgeMQ pulls in gorilla/websocket (see service/sendrecv.go) - and was curious as to why this is needed?

Great project btw - impressively well done, IMO.
-jcw

error while running surgemq

hi, I test surgemq with python paho client.

I got this error log :
E0424 18:53:12.538812 16826 process.go:55/processor] (2/testing) Error peeking next message size: EOF
E0424 18:53:44.130772 16826 sendrecv.go:55/receiver] (1/testingsub) error reading from connection: read tcp 127.0.0.1:52919: i/o timeout
E0424 18:53:44.130878 16826 process.go:55/processor] (1/testingsub) Error peeking next message size: EOF
E0424 18:54:45.138582 16826 sendrecv.go:55/receiver] (3/testingsub) error reading from connection: read tcp 127.0.0.1:36188: i/o timeout
E0424 18:54:45.139055 16826 process.go:55/processor] (3/testingsub) Error peeking next message size: EOF
E0424 18:55:32.609175 16826 process.go:55/processor] (5/testing) Error peeking next message size: EOF
E0424 18:55:35.389322 16826 sendrecv.go:55/receiver] (6/testing) error reading from connection: read tcp 127.0.0.1:37169: connection reset by peer
E0424 18:55:35.389759 16826 process.go:55/processor] (6/testing) Error peeking next message size: EOF
E0424 18:55:46.143154 16826 sendrecv.go:55/receiver] (4/testingsub) error reading from connection: read tcp 127.0.0.1:44035: i/o timeout
E0424 18:55:46.143265 16826 process.go:55/processor] (4/testingsub) Error peeking next message size: EOF
E0424 18:56:47.149011 16826 sendrecv.go:55/receiver] (7/testingsub) error reading from connection: read tcp 127.0.0.1:47237: i/o timeout
E0424 18:56:47.149102 16826 process.go:55/processor] (7/testingsub) Error peeking next message size: EOF
E0424 18:57:48.206160 16826 sendrecv.go:55/receiver] (8/testingsub) error reading from connection: read tcp 127.0.0.1:41006: i/o timeout
E0424 18:57:48.206253 16826 process.go:55/processor] (8/testingsub) Error peeking next message size: EOF
E0424 18:58:49.227415 16826 sendrecv.go:55/receiver] (9/testingsub) error reading from connection: read tcp 127.0.0.1:37368: i/o timeout
E0424 18:58:49.227507 16826 process.go:55/processor] (9/testingsub) Error peeking next message size: EOF

as you see, I got timeout that closed subscribed script connection althought mqtt has reconnect feature.
and error peeking next message while receive publish or sending subscribtion

Dangerous race-conditions in the buffer

I haven't noticed it first as I used an idiomatic channel approach for a while but when I was stuck at 100k msg/sec, I took a look at your ring buffer once again.
In general I really like your buffer (and its amazingly fast 👍 ) but i've found some serious race conditions with your cursor locks.

They were hard to find because they're occurring really really rarely on my machine (1 out of 10k messages or so in a strictly serial 1-write-1-read test) but then your broker explodes.

Just take a look at ReadWait and WriteCommit.

func (this *buffer) WriteCommit(n int) (int, error) {
    start, cnt, err := this.waitForWriteSpace(n)
    if err != nil {
        return 0, err
    }

    // If we are here then there's enough bytes to commit
    this.pseq.set(start + int64(cnt))

    this.ccond.L.Lock()
    this.ccond.Broadcast()
    this.ccond.L.Unlock()

    return cnt, nil
}

func (this *buffer) ReadWait(n int) ([]byte, error) {
    if int64(n) > this.size {
        return nil, bufio.ErrBufferFull
    }

    if n < 0 {
        return nil, bufio.ErrNegativeCount
    }

    cpos := this.cseq.get()
    ppos := this.pseq.get()

    // This is the magic read-to position. The producer position must be equal or
    // greater than the next position we read to.
    next := cpos + int64(n)

    // If there's no data, then let's wait until there is some data
    this.ccond.L.Lock()
    for ; next > ppos; ppos = this.pseq.get() {
        if this.isDone() {
            return nil, io.EOF
        }

        this.ccond.Wait()
    }
    this.ccond.L.Unlock()

    // If we are here that means we have at least n bytes of data available.
    cindex := cpos & this.mask

    // If cindex (index relative to buffer) + n is more than buffer size, that means
    // the data wrapped
    if cindex+int64(n) > this.size {
        // reset the tmp buffer
        this.tmp = this.tmp[0:0]

        l := len(this.buf[cindex:])
        this.tmp = append(this.tmp, this.buf[cindex:]...)
        this.tmp = append(this.tmp, this.buf[0:n-l]...)
        return this.tmp[:n], nil
    }

    return this.buf[cindex : cindex+int64(n)], nil
}

The race scenario:

  1. ReadWait reads the producer position atomically with ppos := this.pseq.get()
  2. Context switch
  3. WriteCommit advances ppos, locks ccond, notifies sleeping consumers and finishes
  4. Context switch
  5. ReadWait calculates next, locks consumers, goes to sleep and never wakes up

Simple fix:
As you'll acquire the consumer lock anyway, you should read and write the producer position with consumer lock.

I haven't checked the other methods yet, but chances are good that there are more races like this one as you always use atomic operations without locking the parties.

consistent failure: network i/o timeout

The following code fails, always after some 42 or 43 seconds (go 1.5.3, Mac OSX 10.11.3):

package main

import (
    "flag"
    "time"

    "github.com/surge/glog"
    "github.com/surgemq/message"
    "github.com/surgemq/surgemq/service"
)

func main() {
    flag.Parse()
    defer glog.Flush()

    go func() {
        srv := service.Server{}
        glog.Info("starting MQTT server at :12345")
        glog.Fatal(srv.ListenAndServe("tcp://:12345"))
    }()

    // wait a bit, the internal MQTT server is still starting up
    time.Sleep(time.Second)

    client := &service.Client{}

    msg := message.NewConnectMessage()
    msg.SetVersion(4)
    msg.SetCleanSession(true)
    msg.SetClientId([]byte("me"))

    if err := client.Connect("tcp://:12345", msg); err != nil {
        glog.Fatal(err)
    }
    glog.Infoln("connected to port 12345")

    for t := range time.Tick(time.Second) {
        glog.Info(t)

        msg := message.NewPublishMessage()
        msg.SetTopic([]byte("abc"))
        msg.SetPayload([]byte("def"))

        e := client.Publish(msg, nil)
        if e != nil {
            glog.Fatal("publish: ", e)
        }
    }
}

Sample error output from go run blah.go -logtostderr :

...
I0122 21:40:55.862248   11954 blah.go:37/main] 2016-01-22 21:40:55.862186826 +0100 CET
I0122 21:40:56.865790   11954 blah.go:37/main] 2016-01-22 21:40:56.8656395 +0100 CET
I0122 21:40:57.864022   11954 blah.go:37/main] 2016-01-22 21:40:57.863648731 +0100 CET
E0122 21:40:58.861597   11954 sendrecv.go:77/receiver] (2/me) error reading from connection: read tcp 127.0.0.1:52627->127.0.0.1:12345: i/o timeout
I0122 21:40:58.862059   11954 blah.go:37/main] 2016-01-22 21:40:58.861395068 +0100 CET
F0122 21:40:58.862185   11954 blah.go:44/main] publish: (2/me) Error sending PUBLISH message: service: buffer is not ready
exit status 255

I can't figure out what is causing this problem. Can anyone else reproduce this on a different system?

Problem connecting to surgemq

I'm running an MQTT client that is able to connect to the public MQTT brokers at broker.mqtt-dashboard.com and iot.eclipse.org, but when I try to connect the client to a stand-alone surgemq broker running on my own machine it fails.

The broker runs without error messages, so I guess it is listening for connections. My client however, cannot connect.

Does anyone have an idea what might be wrong with my setup?

Error peeking next message size: EOF

Running the example code as per the docs throws an error:
Server:

  • 27397 process.go:55/processor] (1/surgemq) Error peeking next message size: EOF

Client:

  • 27425 process.go:55/processor] (1/surgemq) Error peeking next message size: EOF

Env: go version go1.7.3 darwin/amd64

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.