Git Product home page Git Product logo

go-nsq's People

Contributors

ablegao avatar andyxning avatar atlas-comstock avatar crackcomm avatar danielhfrank avatar dcarney avatar devdazed avatar elubow avatar haraldnordgren avatar iaburton avatar jamesgroat avatar jehiah avatar judwhite avatar karalabe avatar manveru avatar martin-sucha avatar mengskysama avatar mreiferson avatar myliu avatar nikitabuyevich avatar philhofer avatar ploxiln avatar schmichael avatar svmehta avatar swanspouse avatar testwill avatar tj avatar twmb avatar vitaliytv avatar vlastv avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-nsq's Issues

Error host name

I use this to start a node:
./nsqlookupd
./nsqd --lookupd-tcp-address=127.0.0.1:4160 --tcp-address=127.0.0.1:4150
then got log:
2014/07/29 20:08:57 TCP: new client(127.0.0.1:62823)
2014/07/29 20:08:57 CLIENT(127.0.0.1:62823): desired protocol magic ' V1'
2014/07/29 20:08:57 CLIENT(127.0.0.1:62823): IDENTIFY Address:Nicol.local TCP:4150 HTTP:4151 Version:0.2.29

the address is not [127.0.0.1], but [Nicol.local].
so, when i use go-nsq package as a consumer, i got a error:
2014/07/29 20:02:16 INF 1 [crmmsg/msg#ephemeral]
querying nsqlookupd http://127.0.0.1:4161/lookup?topic=crmmsg
2014/07/29 20:02:16 INF 1 [crmmsg/msg#ephemeral] starting Handler
2014/07/29 20:02:16 INF 1 crmmsg/msg#ephemeral connecting to nsqd
2014/07/29 20:02:16 ERR 1 crmmsg/msg#ephemeral error connecting to nsqd - dial tcp: lookup Nicol.local: no such host

I read the code, in conn.go , line 130, method Connect().
conn, err := net.DialTimeout("tcp", c.addr, time.Second)
i think, this line got a error, because of the wrong address.
is this a bug? How to resolve it?

consumer: retry nsqlookupd query if it fails

A few days ago, due to a provisioning error, I accidentally added a bad lookupd to the list. My app eventually queried a working lookupd and began processing messages, but it took about a minute. (which is what the poll interval is set to) I know there is a balance between finding new nodes and not overloading the lookupds, but would it make sense to try all of the lookupds on startup until one returned success? I'm happy to submit a pull request if that's the case.

change "starting Handler" to debug level

maybe?

2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF    1 [red-events/red-buffer] starting Handler
....

gets a little crazy haha :D

Reporting discards

Any suggested way to report on discards? Or just manually check in your handler? Maybe an optional HandleDiscard function would be cool

Please clarify relationship of go-nsq v1.0.4 to nsq release v0.3.5 and legacy/refactored API

The go-nsq README.md indicates:

The latest stable release is 1.0.4

NOTE: The public API has been refactored as of v1.0.0 and is not backwards compatible with previous releases. 0.3.7 is the last stable release compatible with the legacy API. Please read the UPGRADING guide.

Q1 - With respect to this (0.3.7 and 1.0.4) versioning scheme for go-nsq, are these versions only with respect to go-nsq, and independent of the nsq version?

Q2 - Does the go-nsq version with the refactored 1.0.4 API require a particular matched version of nsq to run against? Do I need to be running against a partciular minimum version of nsq?

Q3 - Are the HEAD of the git repos for nsq and go-nsq in sync with each other? e.g. if I submit a PR against one, should I be checking it against the other as of the latest code.

Thank you.

Jason

default DRY as to the cocurrent handlers of the cosumer

It seems that the default RDY for the cosumer connection is always 1, in spite of the "cocurrency" that set by the "AddConcurrentHandlers" method, consequently, the messages can only be sent to one handler goroutine at the same time, other goroutines has to wait, until the "FIN" is sent for the read message.

To resolve this situation, it has to manually call the "ChangeMaxInFlight" method, then all of the handler goroutines can receive messages (if they're in the queue ) at the same time.

document thread-safety

looks like the producer is but adding a quick node about thread safety properties would be sweet! safe some digging

cheers

consumer/producer: consistent Stop() behavior

While I make my messaging layer built on top of go-nsq behave consistently w.r.t. my consumer/producer stop methods, it would be nice if go-nsq's consumer/producer stop methods behaved similarly. Is there a particular reason that Consumer.Stop() is asynchronous and Producer.Stop() is synchronous?

Would a PR making them behave similarly be welcome? I'm honestly not sure which I prefer.

Question: Reason for returning `error` from nsq.HandlerFunc?

Hi, I was wondering what the case would be for returning an error from nsq.HandlerFunc?
https://github.com/nsqio/go-nsq/blob/master/consumer.go#L38

It deviates slightly from the signature of http.HandlerFunc:
https://golang.org/src/net/http/server.go?s=41267:41314#L1408

I'm adding some middlewares to decorate the final message handler with some additional functionality. It works fine but feels a bit weird having unused error as in handler. Should I be using this error for a certain use case?

func metricsHandler(next nsq.Handler) nsq.Handler {
    return nsq.HandlerFunc(func(msg *nsq.Message) error {
        fmt.Println("Push to DataDog, Librato, etc")
        next.HandleMessage(msg)
        return nil // deviates from http.Handler signature
    })
}

func finalHandler(msg *nsq.Message) error {
    fmt.Println(string(msg.Body))
    return nil  // deviates from http.Handler signature
}

func main() {
  // ...
  var h nsq.Handler
  h = nsq.HandlerFunc(finalHandler)
  h = metricsHandler(h)
  q.AddHandler(h)
  // ...
}

producer nsqlookupd

I want to publish and subscribe to nsq using nsqlookupd(s). That works fine for the consumer part but not for the producer part as it requires an address of a running nsqd at construction time.
Would it be possible that a producer connects to nsqlookupd(s), instead of a single nsqd. In fact, why is the API of the producer so different from that of a consumer regarding how it connects to nsq? Or maybe I am completely blind to some obvious solution?

panic() variant of Set() for config

I'm getting IO timeouts with nsqd v0.2.28 (built w/go1.2.1) for anything blocking for a second, which I'll need to fix anyway, but more importantly I didn't realize that Set() returns instead of a panic. I probably just don't have master with the msg_timeout support, but having a MustSet would be great for simple bootstrapping, dev error in my case. yay/nay?

advice / question: exposing over HTTPS to browser / apps

i am using NSQ for communication between different micro services. The micro services do not have a HTTP transport at all.

So, i am looking for options to help with putting a layer on top of the micro services that supports HTTP and WS (Web Sockets).

There are many possible ways to do it, and i am not quite qure which is the best.

improve RDY logic

In nsqio/pynsq#37 and nsqio/pynsq#38 we greatly improved pynsq RDY handling. I think the following are remaining differences between go-nsq and pynsq in regards to RDY handling

  • At connection add, a new connection can be starved because a separate connection could have the full max-in-flight value (bug)
  • New connections that can't send the full RDY for a connection should send a truncated RDY value and delay/retry sending RDY when unable to send as a result of global max-in-flight
  • When going into backoff all connections should be updated to RDY 0 immediately
  • When coming out of backoff, a single connection should be updated with RDY 1 (and redistribute should move it to another connection as appropriate)
  • Improve redistribution (redistribute during backoff but not when in a backoff block, redistribute on edge cases with max in flight, redistribute when a connection closes and it had RDY and we were in backoff)
  • Comprehensive reader tests (matching pynsq)

I'm going to open separate issues to tackle these items. This can track the overall issue of parity (there are probably things i'm forgetting) and we can close it when we are satisfied.

cc: @mreiferson

Method for "Reducing Handlers"

Is it possible to provide a method for Reducing the specified consumer handlers (goroutines)? Then it's able to create APIs for controlling the message consuming speed dynamically.

consumer/producer: add interfaces

To facilitate testing, I've found myself abstracting out the go-nsq API so I can swap in mock implementations.

nsq.Producer/nsq.Consumer should be interfaces so that users don't have to create their own abstraction layer.

Reader can race when stopping

2013/10/03 20:08:13 stopping reader
2013/10/03 20:08:13 exiting lookupdLoop
2013/10/03 20:08:43 closing incomingMessages

panic: runtime error: send on closed channel

goroutine 12 [running]:
github.com/bitly/nsq/nsq.(*Reader).readLoop(0xf8400a9160, 0xf8400f7000, 0x0, 0x0)
    /usr/src/redhat/BUILD/bitly-nsq-0.2.22-15/src/github.com/bitly/nsq/nsq/reader.go:636 +0xd4d
created by github.com/bitly/nsq/nsq.(*Reader).ConnectToNSQ
    /usr/src/redhat/BUILD/bitly-nsq-0.2.22-15/src/github.com/bitly/nsq/nsq/reader.go:565 +0x16c6

nsq_to_nsq: producer never reconnects in some cases

18:15:13 [aggregate]: finished 250 - 99th: 186.30ms - 95th: 184.76ms - avg: 184.38ms
18:19:23 [aggregate]: finished 250 - 99th: 187.25ms - 95th: 185.06ms - avg: 184.42ms
18:23:33 [aggregate]: finished 250 - 99th: 185.87ms - 95th: 184.81ms - avg: 184.38ms
18:28:38 ERR    2 (this.is.a.cname:4050) IO error - write tcp 123.45.67.890:4050: i/o timeout
18:28:38 ERR    2 (this.is.a.cname:4050) sending command - write tcp 123.45.67.890:4050: i/o timeout
18:28:38 ERR    2 (this.is.a.cname:4050) IO error - write tcp 123.45.67.890:4050: i/o timeout
18:28:38 ERR    2 (this.is.a.cname:4050) sending command - write tcp 123.45.67.890:4050: i/o timeout
18:28:38 ERR    2 (this.is.a.cname:4050) IO error - EOF
18:28:38 INF    2 (this.is.a.cname:4050) beginning close
18:28:38 INF    2 (this.is.a.cname:4050) readLoop exiting
18:28:38 INF    2 (this.is.a.cname:4050) breaking out of writeLoop
18:28:38 INF    2 (this.is.a.cname:4050) writeLoop exiting
18:28:38 INF    2 (this.is.a.cname:4050) finished draining, cleanup exiting
18:28:38 INF    2 (this.is.a.cname:4050) clean close complete
18:28:38 WRN    1 [my-logs/nsq_to_nsq] backing off for 2.0000 seconds (backoff level 1), setting all to RDY 0
18:28:38 [aggregate]: finished 250 - 99th: 60153.95ms - 95th: 6481.75ms - avg: 2178.38ms
18:28:40 WRN    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) backoff timeout expired, sending RDY 1
18:29:38 ERR    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) IO error - read tcp 127.0.0.1:4050: i/o timeout
18:29:38 WRN    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) delaying close, 24 outstanding messages
18:29:38 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) readLoop exiting
18:30:03 ERR    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) IO error - write tcp 127.0.0.1:4050: connection reset by peer
18:30:03 ERR    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) error sending RDY 1 - write tcp 127.0.0.1:4050: connection reset by peer
18:30:03 WRN    1 [my-logs/nsq_to_nsq] backing off for 4.0000 seconds (backoff level 2), setting all to RDY 0
18:30:03 ERR    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) IO error - write tcp 127.0.0.1:4050: connection reset by peer
18:30:03 ERR    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) error sending command REQ 0811ec9736c5d000 90000 - write tcp 127.0.0.1:4050: connection reset by peer
18:30:03 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) beginning close
18:30:03 ERR    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) IO error - write tcp 127.0.0.1:4050: connection reset by peer
18:30:03 ERR    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) error sending command REQ 0811ec983085d000 90000 - write tcp 127.0.0.1:4050: connection reset by peer
18:30:03 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) breaking out of writeLoop
18:30:03 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) writeLoop exiting
18:30:03 INF    2 exiting router
18:30:03 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) finished draining, cleanup exiting
18:30:03 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) clean close complete
18:30:03 WRN    1 [my-logs/nsq_to_nsq] there are 0 connections left alive
18:30:03 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) re-connecting in 15 seconds...
18:30:18 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) connecting to nsqd
18:30:18 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) upgrading to TLS
19:01:43 WRN    1 [my-logs/nsq_to_nsq] backing off for 4.0000 seconds (backoff level 2), setting all to RDY 0
19:01:47 WRN    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) backoff timeout expired, sending RDY 1
19:01:47 INF    2 (this.is.a.cname:4050) connecting to nsqd
19:01:48 ERR    2 (this.is.a.cname:4050) error connecting to nsqd - dial tcp 123.45.67.890:4050: i/o timeout
19:01:48 ERR    1 [my-logs/nsq_to_nsq] Handler returned error (dial tcp 123.45.67.890:4050: i/o timeout) for msg 0811f33e490d7000
19:01:48 WRN    1 [my-logs/nsq_to_nsq] backing off for 8.0000 seconds (backoff level 3), setting all to RDY 0
19:01:56 WRN    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) backoff timeout expired, sending RDY 1
19:01:56 INF    2 (this.is.a.cname:4050) connecting to nsqd
19:02:00 INF    2 (this.is.a.cname:4050) upgrading to TLS
19:02:05 ERR    2 (this.is.a.cname:4050) error connecting to nsqd - failed to IDENTIFY - tls: oversized record received with length 19968
19:02:05 ERR    1 [my-logs/nsq_to_nsq] Handler returned error (failed to IDENTIFY - tls: oversized record received with length 19968) for msg 0811f33f428d7000
19:02:05 WRN    1 [my-logs/nsq_to_nsq] backing off for 15.0000 seconds (backoff level 4), setting all to RDY 0
19:02:20 WRN    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) backoff timeout expired, sending RDY 1
19:02:20 INF    2 (this.is.a.cname:4050) connecting to nsqd
19:03:06 INF    2 (this.is.a.cname:4050) upgrading to TLS
19:03:06 ERR    2 (this.is.a.cname:4050) error connecting to nsqd - failed to IDENTIFY - tls: oversized record received with length 19968
19:03:06 ERR    1 [my-logs/nsq_to_nsq] Handler returned error (failed to IDENTIFY - tls: oversized record received with length 19968) for msg 0811f3403d0d7000
19:03:06 WRN    1 [my-logs/nsq_to_nsq] backing off for 15.0000 seconds (backoff level 4), setting all to RDY 0
19:03:21 WRN    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) backoff timeout expired, sending RDY 1
19:03:21 INF    2 (this.is.a.cname:4050) connecting to nsqd
19:04:10 INF    2 (this.is.a.cname:4050) upgrading to TLS
19:04:10 ERR    2 (this.is.a.cname:4050) error connecting to nsqd - failed to IDENTIFY - tls: oversized record received with length 19968
19:04:10 ERR    1 [my-logs/nsq_to_nsq] Handler returned error (failed to IDENTIFY - tls: oversized record received with length 19968) for msg 0811f341364d7000
19:04:10 WRN    1 [my-logs/nsq_to_nsq] backing off for 15.0000 seconds (backoff level 4), setting all to RDY 0
19:04:25 WRN    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) backoff timeout expired, sending RDY 1
19:04:25 INF    2 (this.is.a.cname:4050) connecting to nsqd
19:04:25 INF    2 (this.is.a.cname:4050) upgrading to TLS
19:04:26 WRN    1 [my-logs/nsq_to_nsq] backing off for 8.0000 seconds (backoff level 3), setting all to RDY 0
19:04:34 WRN    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) backoff timeout expired, sending RDY 1
19:04:34 WRN    1 [my-logs/nsq_to_nsq] backing off for 4.0000 seconds (backoff level 2), setting all to RDY 0
19:06:30 ERR    2 (this.is.a.cname:4050) IO error - EOF
19:06:30 INF    2 (this.is.a.cname:4050) beginning close
19:06:30 INF    2 (this.is.a.cname:4050) readLoop exiting
19:06:30 INF    2 (this.is.a.cname:4050) breaking out of writeLoop
19:06:30 INF    2 (this.is.a.cname:4050) writeLoop exiting
19:06:30 ERR    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) IO error - EOF
19:06:30 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) beginning close
19:06:30 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) readLoop exiting
19:06:30 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) breaking out of writeLoop
19:06:30 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) writeLoop exiting
19:06:30 WRN    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) backoff timeout expired, sending RDY 1
19:06:30 INF    2 (this.is.a.cname:4050) finished draining, cleanup exiting
19:06:30 INF    2 (this.is.a.cname:4050) clean close complete
19:06:30 INF    2 exiting router
19:06:30 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) finished draining, cleanup exiting
19:06:30 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) clean close complete
19:06:30 WRN    1 [my-logs/nsq_to_nsq] there are 0 connections left alive
19:06:30 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) re-connecting in 15 seconds...
19:06:45 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) connecting to nsqd
19:06:45 INF    1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) upgrading to TLS

After both, at the end, where it says "upgrading to TLS", the producer side was also alive. I fixed by restarting nsq_to_nsq, which spammed finished 250 logs, aka, it was down for hours.

How to just consume new messages regardless of old messages??

My server started a conusmer to consume messages, then I stop it for 3 days, then I start my server again, the old message make no sense to me (cause I use nsq for realtime notification), I just want to consume the new messages, how to solve it without consuming in new channel?

nsq admin does shows no client connect and 0 messages

Hi,

I have started 2 nsqd, 1nsqlookupd and 1 nsqadmin.
When stopping 2 nsqd and 1 nsqlookupd, and starting up again, the nsqadmin does not seem to recognise or for some other reason he cant see other instances are back up.

It also the same for when stopping 2 nsqd, 1 nsqlookupd and 1 nsqadmin and starting up again, The nsqadmin does not seem to recognise or for some other reason he cant see other instances are back up.

I'm not sure what to do, does anyone knows about something that can be related ?

note: the nsqd instances seem to work just fine, the whole system seem to be functional but nsqadmin does not seem to understand and adapt to the new clients.

Thank you

Is there any reasons to use go-simplejson rather than built-in json package?

Hi, guys.
While I'm reading the source in api_request.go, I found that it use "simplejson" package to unmarshal json in the method called apiRequestNegotiateV1.

Why not just define a struct and pass a pointer which address to it into json.Unmarshal.
I think that may improve the readability of the code and may not affect the performance too much.

Unexpected fault address fatal error

Hi,

I didn't update go-nsq for a long time I must say, maybe 3 weeks, but I just did install for program on a ubuntu server (thus every dependency has been checked out), and I get this weird error that I never saw before. As you can see, the error isn't really explicit.

I first thought it was coming from my code, and it may be still, but as I can't find why it would do that and as go-nsq appears a lot in there, I prefer to post it here hoping that anyone would have an idea where it comes from.

It seems to appear only when there's concurrent nsq access (producing several messages and receiving several messages at same time)

unexpected fault address 0x7f9000000011
fatal error: fault
[signal 0xb code=0x1 addr=0x7f9000000011]

goroutine 2087 [running]:

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

    :0

goroutine 1 [select]:
main.main
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:178

goroutine 8 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178

goroutine 9 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048

goroutine 140 [select]:
craft.getDataFromDatas
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:102
github.com_testar_test_mount_of_doom_craft.GetTestData
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:141
main.craftTestID
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft_test.go:232
main.handleCraftTestMessage
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:95
main.$nested0
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:122
created by main.handlePackage
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:121

goroutine 13 [IO wait]:
github.com_bitly_go_nsq.Read.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:238
github.com_bitly_go_nsq.ReadResponse
    /home/ubuntu/go/src/github.com/bitly/go-nsq/protocol.go:54
github.com_bitly_go_nsq.ReadUnpackedResponse
    /home/ubuntu/go/src/github.com/bitly/go-nsq/protocol.go:91
github.com_bitly_go_nsq.readLoop.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:475
created by github.com_bitly_go_nsq.Connect.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:176

goroutine 14 [select]:
github.com_bitly_go_nsq.writeLoop.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:541
created by github.com_bitly_go_nsq.Connect.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:177

goroutine 15 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324

goroutine 16 [chan receive]:
qlib.handleInternal
    /home/ubuntu/go/src/github.com/testar-test/go-qlib/client.go:230
created by github.com_testar_test_go_qlib.run.pN38_github.com_testar_test_go_qlib.Client
    /home/ubuntu/go/src/github.com/testar-test/go-qlib/client.go:224

goroutine 17 [IO wait]:
github.com_bitly_go_nsq.Read.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:238
github.com_bitly_go_nsq.ReadResponse
    /home/ubuntu/go/src/github.com/bitly/go-nsq/protocol.go:54
github.com_bitly_go_nsq.ReadUnpackedResponse
    /home/ubuntu/go/src/github.com/bitly/go-nsq/protocol.go:91
github.com_bitly_go_nsq.readLoop.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:475
created by github.com_bitly_go_nsq.Connect.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:176

goroutine 18 [select]:
github.com_bitly_go_nsq.writeLoop.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:541
created by github.com_bitly_go_nsq.Connect.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:177

goroutine 19 [select]:
github.com_bitly_go_nsq.router.pN32_github.com_bitly_go_nsq.Producer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/producer.go:261
created by github.com_bitly_go_nsq.connect.pN32_github.com_bitly_go_nsq.Producer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/producer.go:241

goroutine 20 [IO wait]:
github.com_gocql_gocql.recv.pN27_github.com_gocql_gocql.Conn
    /home/ubuntu/go/src/github.com/gocql/gocql/conn.go:262
github.com_gocql_gocql.serve.pN27_github.com_gocql_gocql.Conn
    /home/ubuntu/go/src/github.com/gocql/gocql/conn.go:240
created by github.com_gocql_gocql.Connect
    /home/ubuntu/go/src/github.com/gocql/gocql/conn.go:174

goroutine 22 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178

goroutine 23 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048

goroutine 28 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324

goroutine 29 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178

goroutine 30 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048

goroutine 34 [IO wait]:
github.com_bitly_go_nsq.Read.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:238
github.com_bitly_go_nsq.ReadResponse
    /home/ubuntu/go/src/github.com/bitly/go-nsq/protocol.go:54
github.com_bitly_go_nsq.ReadUnpackedResponse
    /home/ubuntu/go/src/github.com/bitly/go-nsq/protocol.go:91
github.com_bitly_go_nsq.readLoop.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:475
created by github.com_bitly_go_nsq.Connect.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:176

goroutine 35 [select]:
github.com_bitly_go_nsq.writeLoop.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:541
created by github.com_bitly_go_nsq.Connect.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:177

goroutine 36 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324

goroutine 37 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178

goroutine 38 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048

goroutine 42 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324

goroutine 43 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178

goroutine 44 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048

goroutine 48 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324

goroutine 49 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178

goroutine 50 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048

goroutine 54 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324

goroutine 132 [select]:
craft.getDataFromDatas
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:102
github.com_testar_test_mount_of_doom_craft.GetTestData
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:141
main.craftTestID
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft_test.go:232
main.handleCraftTestMessage
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:95
main.$nested0
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:122
created by main.handlePackage
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:121

goroutine 55 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178

goroutine 56 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048

goroutine 60 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324

goroutine 61 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178

goroutine 62 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048

goroutine 66 [IO wait]:
github.com_bitly_go_nsq.Read.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:238
github.com_bitly_go_nsq.ReadResponse
    /home/ubuntu/go/src/github.com/bitly/go-nsq/protocol.go:54
github.com_bitly_go_nsq.ReadUnpackedResponse
    /home/ubuntu/go/src/github.com/bitly/go-nsq/protocol.go:91
github.com_bitly_go_nsq.readLoop.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:475
created by github.com_bitly_go_nsq.Connect.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:176

goroutine 67 [select]:
github.com_bitly_go_nsq.writeLoop.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:541
created by github.com_bitly_go_nsq.Connect.pN28_github.com_bitly_go_nsq.Conn
    /home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:177

goroutine 68 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324

goroutine 69 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178

goroutine 70 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048

goroutine 74 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
    /home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324

goroutine 75 [select]:
github.com_testar_test_mount_of_doom_craft.HandleNewMarkers
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/marker.go:145
created by main.main
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:175

goroutine 76 [IO wait]:
github.com_gocql_gocql.recv.pN27_github.com_gocql_gocql.Conn
    /home/ubuntu/go/src/github.com/gocql/gocql/conn.go:262
github.com_gocql_gocql.serve.pN27_github.com_gocql_gocql.Conn
    /home/ubuntu/go/src/github.com/gocql/gocql/conn.go:240
created by github.com_gocql_gocql.Connect
    /home/ubuntu/go/src/github.com/gocql/gocql/conn.go:174

goroutine 151 [select]:
craft.getDataFromDatas
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:102
github.com_testar_test_mount_of_doom_craft.GetTestData
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:141
main.craftTestID
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft_test.go:232
main.handleCraftTestMessage
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:95
main.$nested0
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:122
created by main.handlePackage
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:121

goroutine 184 [select]:
craft.getDataFromDatas
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:102
github.com_testar_test_mount_of_doom_craft.GetTestData
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:141
main.craftTestID
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft_test.go:232
main.handleCraftTestMessage
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:95
main.$nested0
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:122
created by main.handlePackage
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:121

goroutine 2033 [select]:
github.com_AdRoll_goamz_s3.doHttpRequest.pN29_github.com_AdRoll_goamz_s3.S3
    /home/ubuntu/go/src/github.com/AdRoll/goamz/s3/s3.go:1144
github.com_AdRoll_goamz_s3.run.pN29_github.com_AdRoll_goamz_s3.S3
    /home/ubuntu/go/src/github.com/AdRoll/goamz/s3/s3.go:1180
github.com_AdRoll_goamz_s3.query.pN29_github.com_AdRoll_goamz_s3.S3
    /home/ubuntu/go/src/github.com/AdRoll/goamz/s3/s3.go:931
github.com_AdRoll_goamz_s3.PutReader.pN33_github.com_AdRoll_goamz_s3.Bucket
    /home/ubuntu/go/src/github.com/AdRoll/goamz/s3/s3.go:374
github.com_AdRoll_goamz_s3.Put.pN33_github.com_AdRoll_goamz_s3.Bucket
    /home/ubuntu/go/src/github.com/AdRoll/goamz/s3/s3.go:334
craft.putMarkerOnS3
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/marker.go:156
craft.downloadMarker
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/marker.go:89
created by github.com_testar_test_mount_of_doom_craft.HandleNewMarkers
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/marker.go:147

goroutine 2040 [select]:

goroutine 2039 [IO wait]:

goroutine 419 [select]:
craft.getDataFromDatas
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:102
github.com_testar_test_mount_of_doom_craft.GetTestData
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:141
main.craftTestID
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft_test.go:232
main.handleCraftTestMessage
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:95
main.$nested0
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:122
created by main.handlePackage
    /home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:121

The server closes the connection, but the client can't detect and hang

The go-nsq client is unstable sometimes. When the server side meet an io error and close the connection, but the go-nsq client will still wait there indefinitely.

An related log except from the nsqd((i modified the log a litte bit, for debugging)):

[nsqd] 2015/08/13 18:30:14.640803 [127.0.0.1:27884] state rdy: 2000 inflt: 1998
[nsqd] 2015/08/13 18:30:14.640902 PROTOCOL(V2): writing msg(08c483c906801002) to client(127.0.0.1:27884) - {"type":"OutgoingMessageWrapper","track_id":"50217923020965941","enterprise_code":"yto_dev","employee_code":"00079738","from":"router.2","send_at":1439461814,"data_type":"WsOutMessage","data_inner_type":"notify.wait","data":"eyJpZCI6IjUwMjE3OTIzMDIwOTY1OTQxIiwidHlwZSI6Im5vdGlmeS53YWl0IiwiZGF0YSI6eyJjb3VudCI6MCwibm9kZV9jb2RlIjoiIn19","subscriber_id":0}
[nsqd] 2015/08/13 18:30:14.640944 [127.0.0.1:27884] state rdy: 2000 inflt: 1999
[nsqd] 2015/08/13 18:30:14.643325 PROTOCOL(V2): writing msg(08c483c907401000) to client(127.0.0.1:27884) - {"type":"OutgoingMessageWrapper","track_id":"50217923037415477","enterprise_code":"yto_dev","employee_code":"00079738","from":"router.1","send_at":1439461814,"data_type":"WsOutMessage","data_inner_type":"notify.servicecount","data":"eyJpZCI6IjUwMjE3OTIzMDM3NDE1NDc3IiwidHlwZSI6Im5vdGlmeS5zZXJ2aWNlY291bnQiLCJkYXRhIjp7ImNvdW50IjowfX0=","subscriber_id":0}
[nsqd] 2015/08/13 18:30:14.643358 [127.0.0.1:27884] state rdy: 2000 inflt: 2000
[nsqd] 2015/08/13 18:30:14.820049 [127.0.0.1:27884] state rdy: 2000 inflt: 2000
[nsqd] 2015/08/13 18:30:14.867511 PROTOCOL(V2): [127.0.0.1:27884] exiting ioloop
[nsqd] 2015/08/13 18:30:14.867732 ERROR: client(127.0.0.1:27884) - failed to read command - read tcp 127.0.0.1:27884: i/o timeout
[nsqd] 2015/08/13 18:30:14.867778 PROTOCOL(V2): [127.0.0.1:27884] exiting messagePump

An related log except from the go-nsq(i modified the log a litte bit, for debugging):

{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG    4 [connector.1/default] (kefu-dev.hotpu.cn:4150) writeLoop, cmdChan: 0, msgResponseChan: 0"}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG    4 [connector.1/default] (mysite:4150) on msgResponseChan chan"}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG    4 [connector.1/default] (mysite:4150) FIN 08c483c630001004"}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e INF    4 [connector.1/default] (mysite:4150) WriteCommand2 FIN 08c483c630001004..."}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG    4 [connector.1/default] (mysite:4150) writeLoop, cmdChan: 0, msgResponseChan: 0"}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG    4 [connector.1/default] (mysite:4150) on msgResponseChan chan"}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG    4 [connector.1/default] (mysite:4150) FIN 08c483c630801000"}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e INF    4 [connector.1/default] (mysite:4150) WriteCommand2 FIN 08c483c630801000..."}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG    4 [connector.1/default] (mysite:4150) writeLoop, cmdChan: 0, msgResponseChan: 0"}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG    4 [connector.1/default] (mysite:4150) skip sending RDY 2000 (675 remain out of last RDY 2000)"}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG    4 [connector.1/default] (mysite:4150) skip sending RDY 2000 (674 remain out of last RDY 2000)"}

I find i many of the cases, the go-nsq could detect there is a IO error and reconnect, but in a few cases, the go-nsq will hang indefinitely like the logs show above.

How could I let the go-nsq client detect the io error early and trigger an reconnect?

And BTW, here is the config i am using:

if err := nsqConfig.Set("dial_timeout", "5s"); err != nil {
    return err
}
if err := nsqConfig.Set("read_timeout", "2s"); err != nil {
    return err
}
if err := nsqConfig.Set("heartbeat_interval", "1s"); err != nil {
    return err
}
if err := nsqConfig.Set("write_timeout", "1s"); err != nil {
    return err
}
if err := nsqConfig.Set("lookupd_poll_interval", "2s"); err != nil {
    return err
}
if err := nsqConfig.Set("deflate", "true"); err != nil {
    return err
}
if err := nsqConfig.Set("output_buffer_timeout", "250ms"); err != nil {
    return err
}
if err := nsqConfig.Set("max_in_flight", 2000); err != nil {
    return err
}
if err := nsqConfig.Set("msg_timeout", "2s"); err != nil {
    return err
}

producer: use "clean close"

Right now it's just closing the connection, causing spurious errors in nsqd logs (see nsqio/nsq#521)

It should send a CLS command and wait for the appropriate response before closing the connection, like Consumer already does.

cc @tj

Add writer-opt to write TLS

Currently, there are reader-opt tls options, but not writer opts.

Adding writer-opt tls would enable nsqd->nsq_to_nsq->nsqd TLS communication, avoiding the http overhead of nsqd->nsq_to_http->nsqd.

producer: timeout issue

I am running into a timeout issue with producers.

2015/02/15 18:38:48 INF    1 (x.x.x.x:14150) connecting to nsqd
2015/02/15 18:38:49 ERR    1 (x.x.x.x:14150) error connecting to nsqd - dial tcp x.x.x.x:14150: i/o timeout

Let me try and explain my setup:

  1. I am running a NSQ cluster on Google Compute Engine using Container Engine. (Configuration is about the same as this setup: https://gist.github.com/smreed/e93fd74765caef99266a. Only thing that I have changed is that I setup loadbalancers for the nsq daemon.)

  2. I wrote a helper to handle my producer logic ( See: https://gist.github.com/mickelsonm/8ab51cf6171f6aa3100d)

  3. I set the environment variable to the load balancer:

NSQ_DAEMON_HOSTS= x.x.x.x:14150

reduce logging when exiting writeLoop

investigating the broken pipe issue but the logs quickly fill up with thousands of these lines while it slowly waits for the remaining work. Is it possible to lower the log interval? Maybe just tick every second and/or when the in-flight is reduced

red-files 2014/10/30 15:52:10 ERR    1 [red-files/red-extract] (:5001) IO error - write tcp 127.0.0.1:5001: broken pipe
red-files 2014/10/30 15:52:10 ERR    1 [red-files/red-extract] (:5001) error sending command FIN 07526046660c5002 - write tcp 127.0.0.1:5001: broken pipe
red-files 2014/10/30 15:52:10 INF    1 [red-files/red-extract] (:5001) beginning close
red-files 2014/10/30 15:52:10 INF    1 [red-files/red-extract] (:5001) breaking out of writeLoop
red-files 2014/10/30 15:52:10 INF    1 [red-files/red-extract] (:5001) writeLoop exiting
red-files 2014/10/30 15:52:11 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 31 messages in flight
red-files 2014/10/30 15:52:11 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 31 messages in flight
red-files 2014/10/30 15:52:11 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 31 messages in flight
red-files 2014/10/30 15:52:11 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 31 messages in flight
2014/10/30 15:52:11 stats:
2014/10/30 15:52:11   events.published 2419.11/s tick=24,192 total=985,360
2014/10/30 15:52:11   files.processed 0.20/s tick=2 total=2,142
red-files 2014/10/30 15:52:11 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 31 messages in flight
red-files 2014/10/30 15:52:11 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 31 messages in flight
red-files 2014/10/30 15:52:11 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 31 messages in flight
red-files 2014/10/30 15:52:11 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 30 messages in flight
red-files 2014/10/30 15:52:11 WRN    1 [red-files/red-extract] (:5001) delaying close, 30 outstanding messages
red-files 2014/10/30 15:52:11 INF    1 [red-files/red-extract] (:5001) readLoop exiting
red-files 2014/10/30 15:52:11 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 30 messages in flight
red-files 2014/10/30 15:52:11 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 30 messages in flight
red-files 2014/10/30 15:52:11 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:11 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:13 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:13 WRN    1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight

producer: connection backoff

In #95 we documented a case where nsq_to_nsq (which uses both Consumer and Producer) would panic due to a bug in the exit timeout code path.

The bug is exacerbated by the fact that in rare cases the producer could block serially for 1s per PUB if the remote address is black holing the connection attempts.

An alternative strategy might be to back off connection attempts and return errors instantly during backoff windows.

Thoughts?

too many open files

I ran into an issue where publishing messages to NSQ would eventually result in too many open files. I had code in a function that would publish a message as so:

func enqueue(queue string, message []byte) error {
  producer, err := nsq.NewProducer("...", config)
  if err != nil {
     return err
  }
  defer producer.Stop()
  err = producer.Publish(queue, message)
  return err
}

Everytime I called that function a new file descriptor (type = sock when using lsof to monitor things) was being created, that would not go away.

I moved the creation of a the producer outside the function, so the same instance would be used each time a message is published, which has "fixed" the issue. My question, I thought defer producer.Stop() would "cleanup" and thus not cause too many open files. Thoughts?

consumer: access to message channel

I didn't realize the message handler FINs / REQs for you, but it feels a little awkward for async stuff. I have to spawn a goroutine in there and then have a channel to feed errors back, for example this is what I originally had:

func (w *Worker) HandleMessage(msg *nsq.Message) error {
    go func() {
        _, err := w.Write(msg.Body)

        if err != nil {
            log.Printf("Error: %s (REQ %s)", err, msg.ID)
            msg.Requeue(1 * time.Minute)
            return
        }

        msg.Finish()
    }()

    return nil
}

Thenn I realized I was getting way too many messages haha. It would be awesome to have access to the message channel (if it makes sense internally, not sure). Delegating FIN / REQ is clearer to me at least than tweeking REQ values at the config-level and doing this handle message stuff.

What do you guys think?

Oddly I wasn't getting any errors for double-FINs :s not sure what's up there

opaque Config defaults

It's not obvious from go-doc documentation what the default config values are, or what time some are, and there is no way to introspect them at runtime. Both seem like they should exist.

consumer: deadlock after disconnection

I use go-nsq to consum msg. The HandlerMessage will return nil or err if needed. I am no sure where is the problem, please help.

this is my step:
1: set MaxAttmepts = 2; ConnectToNSQLookupds is (127.0.0.2:4161 and 127.0.0.1:4161); 127.0.0.1 is real nsqlookupd, 127.0.0.2 is nothing
2: produce 1 msg, HandlerMessage return nil, everything is ok
3: produce 1 msg, HandlerMessage return err, everything is ok
4: After MaxAttempts, I kill nsqlookupd and nsqd. Restart nsqlookupd and nsqdm
5: produce 1 msg, consumer can not get any msg. I check the admin-web and msg are saved in nsq.

if handler return nil, everything is ok.

this is debug log:

2015/04/07 18:02:22 INF    1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:02:22 DBG    1 [test/ch] starting Handler
2015/04/07 18:02:22 INF    1 [test/ch] (zhangdl-pc:4150) connecting to nsqd
2015/04/07 18:02:22 DBG    1 [test/ch] (zhangdl-pc:4150) IDENTIFY response: &{MaxRdyCount:2500 TLSv1:false Deflate:false Snappy:false AuthRequired:false}
2015/04/07 18:02:22 DBG    1 [test/ch] (zhangdl-pc:4150) sending RDY 1 (0 remain from last RDY 0)
2015/04/07 18:02:22 DBG    1 [test/ch] (zhangdl-pc:4150) sending RDY 1 (0 remain from last RDY 1)
2015/04/07 18:02:22 DBG    1 [test/ch] (zhangdl-pc:4150) FIN 081fb125edf2e000
2015/04/07 18:02:47 DBG    1 [test/ch] (zhangdl-pc:4150) sending RDY 1 (0 remain from last RDY 1)
2015/04/07 18:02:47 ERR    1 [test/ch] Handler returned error (tsdb retrun err code: 400) for msg 081fb125edf2e001
2015/04/07 18:02:47 DBG    1 [test/ch] (zhangdl-pc:4150) REQ 081fb125edf2e001
2015/04/07 18:02:47 WRN    1 [test/ch] backing off for 2.0000 seconds (backoff level 1), setting all to RDY 0
2015/04/07 18:02:49 WRN    1 [test/ch] (zhangdl-pc:4150) backoff timeout expired, sending RDY 1
2015/04/07 18:02:52 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:03:22 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:03:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:03:52 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:04:17 ERR    1 [test/ch] Handler returned error (tsdb retrun err code: 400) for msg 081fb125edf2e001
2015/04/07 18:04:17 DBG    1 [test/ch] (zhangdl-pc:4150) REQ 081fb125edf2e001
2015/04/07 18:04:17 WRN    1 [test/ch] backing off for 4.0000 seconds (backoff level 2), setting all to RDY 0
2015/04/07 18:04:21 WRN    1 [test/ch] (zhangdl-pc:4150) backoff timeout expired, sending RDY 1
2015/04/07 18:04:22 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:04:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:04:52 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:05:22 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:05:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:05:52 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:06:22 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:06:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:06:52 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:07:17 WRN    1 [test/ch] msg 081fb125edf2e001 attempted 3 times, giving up
2015/04/07 18:07:17 DBG    1 [test/ch] (zhangdl-pc:4150) FIN 081fb125edf2e001
2015/04/07 18:07:17 WRN    1 [test/ch] backing off for 2.0000 seconds (backoff level 1), setting all to RDY 0
2015/04/07 18:07:19 WRN    1 [test/ch] (zhangdl-pc:4150) backoff timeout expired, sending RDY 1
2015/04/07 18:07:22 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:07:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:07:52 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:08:22 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:08:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:08:27 ERR    1 [test/ch] (zhangdl-pc:4150) IO error - EOF 
2015/04/07 18:08:27 INF    1 [test/ch] (zhangdl-pc:4150) beginning close
2015/04/07 18:08:27 INF    1 [test/ch] (zhangdl-pc:4150) readLoop exiting
2015/04/07 18:08:27 INF    1 [test/ch] (zhangdl-pc:4150) breaking out of writeLoop
2015/04/07 18:08:27 INF    1 [test/ch] (zhangdl-pc:4150) writeLoop exiting
2015/04/07 18:08:27 INF    1 [test/ch] (zhangdl-pc:4150) finished draining, cleanup exiting
2015/04/07 18:08:27 INF    1 [test/ch] (zhangdl-pc:4150) clean close complete
2015/04/07 18:08:27 WRN    1 [test/ch] there are 0 connections left alive
2015/04/07 18:08:27 INF    1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:08:27 ERR    1 [test/ch] error querying nsqlookupd (http://127.0.0.2:4161/lookup?topic=test) - Get http://127.0.0.2:4161/lookup?topic=test: dial tcp 127.0.0.2:4161: connection refused 
2015/04/07 18:09:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:09:25 INF    1 [test/ch] (zhangdl-pc:4150) connecting to nsqd
2015/04/07 18:09:25 DBG    1 [test/ch] (zhangdl-pc:4150) IDENTIFY response: &{MaxRdyCount:2500 TLSv1:false Deflate:false Snappy:false AuthRequired:false}
2015/04/07 18:09:55 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:10:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:10:25 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:10:55 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:11:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:11:25 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:11:55 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:12:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:12:25 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:12:55 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:13:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:13:25 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:13:55 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:14:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:14:25 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:14:55 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:15:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:15:25 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received

this is my code:

package main

import (
    "bytes"
    "config"
    "errors"
    "flag"
    "fmt"
    "github.com/bitly/go-nsq"
    "github.com/bitly/go-simplejson"
    "io/ioutil"
    "log"
    "net/http"
    "os"
    "plog"
    "strings"
    //"time"
)

type MyTestHandler struct {
    q        *nsq.Consumer
    tsdbAddr string
}

//handle msg
func (h *MyTestHandler) HandleMessage(message *nsq.Message) error {
    //send to opentsdb
    httpclient := &http.Client{}
    a := message.Body
    req, _ := http.NewRequest("POST", h.tsdbAddr, bytes.NewBuffer(a))
    response, err := httpclient.Do(req)
    if err != nil {
        plog.Errorf(" http_to_tsdb_err, MsgId: %s, TsAddr: %s, Attempts %d, err(%s)", message.ID, h.tsdbAddr, message.Attempts, err)
        return err
    }
    defer response.Body.Close()

    //check tsdb resp
    code := response.StatusCode
    if code != 204 && code != 200 {
        body, _ := ioutil.ReadAll(response.Body)
        respdata, err := simplejson.NewJson(body)
        if err != nil {
            plog.Errorf(" tsdb_resp_json_err, MsgId: %s, TsAddr: %s, Attempts %d, err(%s)", message.ID, h.tsdbAddr, message.Attempts, err)
            return err
        }
        respMessage, _ := respdata.Get("error").Get("message").String()
        respDetails, _ := respdata.Get("error").Get("details").String()
        err = errors.New(fmt.Sprintf("tsdb retrun err code: %d", code))
        plog.Errorf(" tsdb_resp_code_err, MsgId: %s, TsAddr: %s, Attempts %d, code: %d, message: %s, details: %s", message.ID, h.tsdbAddr, message.Attempts, code, respMessage, strings.Replace(respDetails, "\n", " ", -1))
        return err
    }

    plog.Infof(" success_to_tsdb, MsgId: %s, TsAddr: %s, Attempts %d", message.ID, h.tsdbAddr, message.Attempts)
    return nil
}

func main() {
    //get config
    var inputConfigFile = flag.String("c", "./xx.conf", "nsqlookupd ip and port")
    flag.Parse()
    proxyConfig, err := config.New(*inputConfigFile)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println(proxyConfig)
    fmt.Println("NsqlookupAddr: ", proxyConfig.NsqlookupAddr)
    fmt.Println("NsqTopic: ", proxyConfig.NsqTopic)
    fmt.Println("NsqChan: ", proxyConfig.NsqChan)
    fmt.Println("TsdbAddr: ", proxyConfig.TsdbAddr)
    fmt.Println("AccessLog: ", proxyConfig.AccessLog)
    fmt.Println("ErrorLog: ", proxyConfig.ErrorLog)
    fmt.Println("GoNsqLibLog: ", proxyConfig.GoNsqLibLog)
    fmt.Println("HandlerMaxRetry: ", proxyConfig.MaxAttempts)
    fmt.Println("MaxFlight: ", proxyConfig.MaxFlight)
    fmt.Println("HeartbeatInterval: ", proxyConfig.HeartbeatInterval)
    fmt.Println("ReadTimeout: ", proxyConfig.ReadTimeout)
    fmt.Println("LookupdPollInterval: ", proxyConfig.LookupdPollInterval)
    fmt.Println("RequeueFlag: ", proxyConfig.RequeueFlag)

    //open log
    plog.StartLog(proxyConfig.AccessLog, proxyConfig.ErrorLog)

    //check config
    if proxyConfig.MaxAttempts <= 0 {
        fmt.Printf("err: HandlerMaxRetry: %d <= 0", proxyConfig.MaxAttempts)
        return
    }
    if proxyConfig.MaxFlight <= 0 {
        fmt.Printf("err: MaxFlight: %d <= 0", proxyConfig.MaxFlight)
        return
    }
    if proxyConfig.ReadTimeout < proxyConfig.HeartbeatInterval {
        fmt.Printf("err: ReadTimeout %d < HeartbeatInterval %d", proxyConfig.ReadTimeout, proxyConfig.HeartbeatInterval)
        return
    }
    if proxyConfig.LookupdPollInterval < proxyConfig.HeartbeatInterval {
        fmt.Printf("err: LookupdPollInterval %d < HeartbeatInterval %d", proxyConfig.LookupdPollInterval, proxyConfig.HeartbeatInterval)
        return
    }

    //set config
    config := nsq.NewConfig()
    config.MaxInFlight = int(proxyConfig.MaxFlight)
    config.MaxAttempts = uint16(proxyConfig.MaxAttempts)
    //config.HeartbeatInterval = time.Duration(proxyConfig.HeartbeatInterval * 1000000)
    //config.ReadTimeout = time.Duration(proxyConfig.ReadTimeout * 1000000)
    //config.LookupdPollInterval = time.Duration(proxyConfig.LookupdPollInterval * 1000000)

    //consumer
    q, _ := nsq.NewConsumer(proxyConfig.NsqTopic, proxyConfig.NsqChan, config)
    //set nsq lib log
    nsqLibLog, err_log := os.OpenFile(proxyConfig.GoNsqLibLog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
    if err_log != nil {
        fmt.Println(err_log)
        return
    }
    q.SetLogger(log.New(nsqLibLog, "", log.Ldate|log.Ltime), 0)
    fmt.Println(config)
    //start
    h := &MyTestHandler{
        q:        q,
        tsdbAddr: "http://" + proxyConfig.TsdbAddr + "/api/put",
    }
    q.AddHandler(h)
    err = q.ConnectToNSQLookupds(proxyConfig.NsqlookupAddr)
    if err != nil {
        plog.Error("ConnectToNSQLookupd err:", err)
        os.Exit(-1)
    }

    <-q.StopChan
}

LICENSE file

I'd like to swipe some code, but this repository lacks a license file.

Performance Question

Hey there,
I'm using this code to make a producer that sends messages to NSQ. I'm using
PublishAsync(), with a "responder" that basically does nothing:

var respChan chan *nsq.ProducerTransaction

func responder(respChan chan *nsq.ProducerTransaction) {
    for _ = range respChan {
        //Get rid of the messages or something... should probably respond, but not sure
        //how to do that yet...
    }
}

I'm sending as many messages as possible, calling publishAsync() in a loop reading from memory. My code maxes out the CPU, and manages about 14,000 messages per second (the messages are quite small, a msgpack'd struct containing a < 16 char string).

In doing a code profile, my code apparently spends 50.5% of its time in "ExternalCode" (no idea what that is).

I've attached my code profile:
cpu copy

Any idea what is going on here?

panic: runtime error: send on closed channel

panic: runtime error: send on closed channel

goroutine 10818 [running]:
runtime.panic(0xd11480, 0x15748fe)
    /usr/local/go/src/pkg/runtime/panic.c:279 +0xf5
github.com/bitly/go-nsq.(*Consumer).onConnMessage(0xc208688000, 0xc209069200, 0xc208bba000)
    /root/gopath/src/mysite.com.cn/kefu/portal/Godeps/_workspace/src/github.com/bitly/go-nsq/consumer.go:632 +0x90
github.com/bitly/go-nsq.(*consumerConnDelegate).OnMessage(0xc208fa45e0, 0xc209069200, 0xc208bba000)
    /root/gopath/src/mysite.com.cn/kefu/portal/Godeps/_workspace/src/github.com/bitly/go-nsq/delegates.go:111 +0x3e
github.com/bitly/go-nsq.(*Conn).readLoop(0xc209069200)
    /root/gopath/src/mysite.com.cn/kefu/portal/Godeps/_workspace/src/github.com/bitly/go-nsq/conn.go:518 +0xb9b
created by github.com/bitly/go-nsq.(*Conn).Connect
    /root/gopath/src/mysite.com.cn/kefu/portal/Godeps/_workspace/src/github.com/bitly/go-nsq/conn.go:181 +0x6af

Could this be a issue in the go-nsq?

high cpu+mem usage in r.log calls in redistributeRDY

just noticed this while profiling a production app.

in cpu profile:

(pprof) top30 -cum
...
(pprof) top30 -cum
5.43s of 34.68s total (15.66%)
Dropped 447 nodes (cum <= 0.17s)
Showing top 30 nodes out of 197 (cum >= 2.47s)
      flat  flat%   sum%        cum   cum%
     0.23s  0.66%  0.66%     27.29s 78.69%  runtime.aeshashbody
         0     0%  0.66%     12.52s 36.10%  github.com/nsqio/go-nsq.(*Consumer).redistributeRDY
     0.15s  0.43%  1.10%     12.27s 35.38%  main.(*Handler).HandleMessage
     0.12s  0.35%  1.44%      7.35s 21.19%  fmt.(*buffer).WriteRune
     0.11s  0.32%  1.76%      6.14s 17.70%  type..eq.struct { lock runtime.mutex; lockOwner *runtime.g; enabled bool; shutdown bool; headerWritten bool; footerWritten bool; shutdownSema uint32; seqStart uint64; ticksStart int64; ticksEnd int64; timeStart int64; timeEnd int64; reading *runtime.traceBuf; empty *runtime.traceBuf; fullHead *runtime.traceBuf; fullTail *runtime.traceBuf; reader *runtime.g; stackTab runtime.traceStackTable; bufLock runtime.mutex; buf *runtime.traceBuf }
     0.03s 0.087%  1.85%         6s 17.30%  runtime.schedule
     0.22s  0.63%  2.48%      5.98s 17.24%  github.com/raintank/raintank-metric/metricdef.GetMetrics
     0.08s  0.23%  2.71%      5.81s 16.75%  runtime.injectglist
     0.54s  1.56%  4.27%      5.73s 16.52%  fmt.(*pp).doPrintf
     0.13s  0.37%  4.64%      4.92s 14.19%  type..hash.[15]runtime.dbgVar
     0.05s  0.14%  4.79%      4.78s 13.78%  main.(*cassandraStore).processWriteQueue
     0.40s  1.15%  5.94%      4.68s 13.49%  main.(*AggMetric).Add
     0.05s  0.14%  6.08%      4.52s 13.03%  github.com/grafana/grafana/pkg/log.(*FileLogWriter).DoRotate
     0.03s 0.087%  6.17%      4.47s 12.89%  github.com/grafana/grafana/pkg/log.(*Logger).StartLogger
     0.51s  1.47%  7.64%      4.13s 11.91%  runtime.findrunnable
     0.23s  0.66%  8.30%      4.08s 11.76%  fmt.(*pp).handleMethods
     0.07s   0.2%  8.51%      3.70s 10.67%  main.(*DefCache).Add
     1.70s  4.90% 13.41%      3.18s  9.17%  runtime.mallocgc
         0     0% 13.41%      3.01s  8.68%  main.(*cassandraStore).insertChunk
         0     0% 13.41%      2.83s  8.16%  net/http.(*ServeMux).Handle
         0     0% 13.41%      2.80s  8.07%  net/http.(*conn).serve
         0     0% 13.41%      2.76s  7.96%  net/http.(*ServeMux).match
     0.02s 0.058% 13.47%      2.75s  7.93%  main.Get
         0     0% 13.47%      2.75s  7.93%  main.get.func1
     0.02s 0.058% 13.52%      2.56s  7.38%  type..eq.github.com/gocql/gocql.resultSchemaChangeFrame
     0.04s  0.12% 13.64%      2.53s  7.30%  runtime.clearCheckmarks
     0.03s 0.087% 13.73%      2.53s  7.30%  runtime.gcmarknewobject_m
         0     0% 13.73%      2.50s  7.21%  github.com/gocql/gocql.(*resultSchemaChangeFrame).Header
         0     0% 13.73%      2.49s  7.18%  runtime.finishsweep_m
     0.67s  1.93% 15.66%      2.47s  7.12%  runtime.heapBitsSweepSpan
(pprof) 
(pprof) list buteRDY
Total: 34.68s
ROUTINE ======================== github.com/nsqio/go-nsq.(*Consumer).redistributeRDY in /home/dieter/go/src/github.com/nsqio/go-nsq/consumer.go
         0     12.52s (flat, cum) 36.10% of Total
         .          .    994:   possibleConns := make([]*Conn, 0, len(conns))
         .          .    995:   for _, c := range conns {
         .          .    996:       lastMsgDuration := time.Now().Sub(c.LastMessageTime())
         .          .    997:       rdyCount := c.RDY()
         .          .    998:       r.log(LogLevelDebug, "(%s) rdy: %d (last message received %s)",
         .       10ms    999:           c.String(), rdyCount, lastMsgDuration)
         .          .   1000:       if rdyCount > 0 && lastMsgDuration > r.config.LowRdyIdleTimeout {
         .          .   1001:           r.log(LogLevelDebug, "(%s) idle connection, giving up RDY", c.String())
         .          .   1002:           r.updateRDY(c, 0)
         .          .   1003:       }
         .          .   1004:       possibleConns = append(possibleConns, c)
         .          .   1005:   }
         .          .   1006:
         .          .   1007:   availableMaxInFlight := int64(maxInFlight) - atomic.LoadInt64(&r.totalRdyCount)
         .          .   1008:   if r.inBackoff() {
         .          .   1009:       availableMaxInFlight = 1 - atomic.LoadInt64(&r.totalRdyCount)
         .          .   1010:   }
         .          .   1011:
         .          .   1012:   for len(possibleConns) > 0 && availableMaxInFlight > 0 {
         .          .   1013:       availableMaxInFlight--
         .          .   1014:       r.rngMtx.Lock()
         .          .   1015:       i := r.rng.Int() % len(possibleConns)
         .          .   1016:       r.rngMtx.Unlock()
         .          .   1017:       c := possibleConns[i]
         .          .   1018:       // delete
         .          .   1019:       possibleConns = append(possibleConns[:i], possibleConns[i+1:]...)
         .     12.51s   1020:       r.log(LogLevelDebug, "(%s) redistributing RDY", c.String())
         .          .   1021:       r.updateRDY(c, 1)
         .          .   1022:   }
         .          .   1023:}
         .          .   1024:
         .          .   1025:// Stop will initiate a graceful stop of the Consumer (permanent)
(pprof) 

memory profile:

(pprof) top30 -cum
2856.55MB of 3014.07MB total (94.77%)
Dropped 347 nodes (cum <= 15.07MB)
Showing top 30 nodes out of 50 (cum >= 61.35MB)
      flat  flat%   sum%        cum   cum%
         0     0%     0%  3013.07MB   100%  runtime.aeshashbody
         0     0%     0%  2602.61MB 86.35%  github.com/nsqio/go-nsq.(*Consumer).redistributeRDY
...
(pprof) list redistributeRDY
Total: 2.94GB
ROUTINE ======================== github.com/nsqio/go-nsq.(*Consumer).redistributeRDY in /home/dieter/go/src/github.com/nsqio/go-nsq/consumer.go
         0     2.54GB (flat, cum) 86.35% of Total
         .          .    994:   possibleConns := make([]*Conn, 0, len(conns))
         .          .    995:   for _, c := range conns {
         .          .    996:       lastMsgDuration := time.Now().Sub(c.LastMessageTime())
         .          .    997:       rdyCount := c.RDY()
         .          .    998:       r.log(LogLevelDebug, "(%s) rdy: %d (last message received %s)",
         .          .    999:           c.String(), rdyCount, lastMsgDuration)
         .          .   1000:       if rdyCount > 0 && lastMsgDuration > r.config.LowRdyIdleTimeout {
         .          .   1001:           r.log(LogLevelDebug, "(%s) idle connection, giving up RDY", c.String())
         .          .   1002:           r.updateRDY(c, 0)
         .          .   1003:       }
         .          .   1004:       possibleConns = append(possibleConns, c)
         .          .   1005:   }
         .          .   1006:
         .          .   1007:   availableMaxInFlight := int64(maxInFlight) - atomic.LoadInt64(&r.totalRdyCount)
         .          .   1008:   if r.inBackoff() {
         .          .   1009:       availableMaxInFlight = 1 - atomic.LoadInt64(&r.totalRdyCount)
         .          .   1010:   }
         .          .   1011:
         .          .   1012:   for len(possibleConns) > 0 && availableMaxInFlight > 0 {
         .          .   1013:       availableMaxInFlight--
         .          .   1014:       r.rngMtx.Lock()
         .          .   1015:       i := r.rng.Int() % len(possibleConns)
         .          .   1016:       r.rngMtx.Unlock()
         .          .   1017:       c := possibleConns[i]
         .          .   1018:       // delete
         .          .   1019:       possibleConns = append(possibleConns[:i], possibleConns[i+1:]...)
         .     2.54GB   1020:       r.log(LogLevelDebug, "(%s) redistributing RDY", c.String())
         .          .   1021:       r.updateRDY(c, 1)
         .          .   1022:   }
         .          .   1023:}
         .          .   1024:
         .          .   1025:// Stop will initiate a graceful stop of the Consumer (permanent)
(pprof) 

consumer: trigger backoff via E_FIN_FAILED

A consumer can become overwhelmed with messages such that when it's completing them they have already server-side timed out. In this case the client thinks it's ok and keeps FIN'ing messages and getting more and may never be able to work off it's backlog (this situation is worse the higher max-in-flight used).

go-nsq could use the presense of E_FIN_FAILED to trigger backoff just like failing processing in the handler would. This will trigger RDY changes such that message flow drops off so the consumer can recover.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.