nsqio / go-nsq Goto Github PK
View Code? Open in Web Editor NEWThe official Go package for NSQ
License: MIT License
The official Go package for NSQ
License: MIT License
I use this to start a node:
./nsqlookupd
./nsqd --lookupd-tcp-address=127.0.0.1:4160 --tcp-address=127.0.0.1:4150
then got log:
2014/07/29 20:08:57 TCP: new client(127.0.0.1:62823)
2014/07/29 20:08:57 CLIENT(127.0.0.1:62823): desired protocol magic ' V1'
2014/07/29 20:08:57 CLIENT(127.0.0.1:62823): IDENTIFY Address:Nicol.local TCP:4150 HTTP:4151 Version:0.2.29
the address is not [127.0.0.1], but [Nicol.local].
so, when i use go-nsq package as a consumer, i got a error:
2014/07/29 20:02:16 INF 1 [crmmsg/msg#ephemeral]
querying nsqlookupd http://127.0.0.1:4161/lookup?topic=crmmsg
2014/07/29 20:02:16 INF 1 [crmmsg/msg#ephemeral] starting Handler
2014/07/29 20:02:16 INF 1 crmmsg/msg#ephemeral connecting to nsqd
2014/07/29 20:02:16 ERR 1 crmmsg/msg#ephemeral error connecting to nsqd - dial tcp: lookup Nicol.local: no such host
I read the code, in conn.go , line 130, method Connect().
conn, err := net.DialTimeout("tcp", c.addr, time.Second)
i think, this line got a error, because of the wrong address.
is this a bug? How to resolve it?
how to connect multiple nsq servers(distributed servers) and produce msg
A few days ago, due to a provisioning error, I accidentally added a bad lookupd to the list. My app eventually queried a working lookupd and began processing messages, but it took about a minute. (which is what the poll interval is set to) I know there is a balance between finding new nodes and not overloading the lookupds, but would it make sense to try all of the lookupds on startup until one returned success? I'm happy to submit a pull request if that's the case.
maybe?
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
2014/08/05 05:20:31 INF 1 [red-events/red-buffer] starting Handler
....
gets a little crazy haha :D
Any suggested way to report on discards? Or just manually check in your handler? Maybe an optional HandleDiscard function would be cool
The go-nsq README.md indicates:
The latest stable release is 1.0.4
NOTE: The public API has been refactored as of v1.0.0 and is not backwards compatible with previous releases. 0.3.7 is the last stable release compatible with the legacy API. Please read the UPGRADING guide.
Q1 - With respect to this (0.3.7 and 1.0.4) versioning scheme for go-nsq, are these versions only with respect to go-nsq, and independent of the nsq version?
Q2 - Does the go-nsq version with the refactored 1.0.4 API require a particular matched version of nsq to run against? Do I need to be running against a partciular minimum version of nsq?
Q3 - Are the HEAD of the git repos for nsq and go-nsq in sync with each other? e.g. if I submit a PR against one, should I be checking it against the other as of the latest code.
Thank you.
Jason
Consumer can't receive any message After a period of time runing,and Duration is not Determined
and the code in https://github.com/dlutxx/hiapns
i need help
thank u
See: /consumer.go#L21
According to https://code.google.com/p/go/issues/detail?id=3611, rand.Rand is not goroutine-safe.
Each xxxxloop() function needs its own random number generator.
It seems that the default RDY for the cosumer connection is always 1, in spite of the "cocurrency" that set by the "AddConcurrentHandlers" method, consequently, the messages can only be sent to one handler goroutine at the same time, other goroutines has to wait, until the "FIN" is sent for the read message.
To resolve this situation, it has to manually call the "ChangeMaxInFlight" method, then all of the handler goroutines can receive messages (if they're in the queue ) at the same time.
looks like the producer is but adding a quick node about thread safety properties would be sweet! safe some digging
cheers
While I make my messaging layer built on top of go-nsq behave consistently w.r.t. my consumer/producer stop methods, it would be nice if go-nsq's consumer/producer stop methods behaved similarly. Is there a particular reason that Consumer.Stop() is asynchronous and Producer.Stop() is synchronous?
Would a PR making them behave similarly be welcome? I'm honestly not sure which I prefer.
Hi, I was wondering what the case would be for returning an error
from nsq.HandlerFunc
?
https://github.com/nsqio/go-nsq/blob/master/consumer.go#L38
It deviates slightly from the signature of http.HandlerFunc
:
https://golang.org/src/net/http/server.go?s=41267:41314#L1408
I'm adding some middlewares to decorate the final message handler with some additional functionality. It works fine but feels a bit weird having unused error as in handler. Should I be using this error
for a certain use case?
func metricsHandler(next nsq.Handler) nsq.Handler {
return nsq.HandlerFunc(func(msg *nsq.Message) error {
fmt.Println("Push to DataDog, Librato, etc")
next.HandleMessage(msg)
return nil // deviates from http.Handler signature
})
}
func finalHandler(msg *nsq.Message) error {
fmt.Println(string(msg.Body))
return nil // deviates from http.Handler signature
}
func main() {
// ...
var h nsq.Handler
h = nsq.HandlerFunc(finalHandler)
h = metricsHandler(h)
q.AddHandler(h)
// ...
}
I want to publish and subscribe to nsq using nsqlookupd(s). That works fine for the consumer part but not for the producer part as it requires an address of a running nsqd at construction time.
Would it be possible that a producer connects to nsqlookupd(s), instead of a single nsqd. In fact, why is the API of the producer so different from that of a consumer regarding how it connects to nsq? Or maybe I am completely blind to some obvious solution?
I'm getting IO timeouts with nsqd v0.2.28 (built w/go1.2.1)
for anything blocking for a second, which I'll need to fix anyway, but more importantly I didn't realize that Set() returns instead of a panic. I probably just don't have master with the msg_timeout support, but having a MustSet would be great for simple bootstrapping, dev error in my case. yay/nay?
i am using NSQ for communication between different micro services. The micro services do not have a HTTP transport at all.
So, i am looking for options to help with putting a layer on top of the micro services that supports HTTP and WS (Web Sockets).
There are many possible ways to do it, and i am not quite qure which is the best.
In nsqio/pynsq#37 and nsqio/pynsq#38 we greatly improved pynsq RDY handling. I think the following are remaining differences between go-nsq and pynsq in regards to RDY handling
max-in-flight
value (bug)I'm going to open separate issues to tackle these items. This can track the overall issue of parity (there are probably things i'm forgetting) and we can close it when we are satisfied.
cc: @mreiferson
Is it possible to provide a method for Reducing the specified consumer handlers (goroutines)? Then it's able to create APIs for controlling the message consuming speed dynamically.
To facilitate testing, I've found myself abstracting out the go-nsq API so I can swap in mock implementations.
nsq.Producer/nsq.Consumer should be interfaces so that users don't have to create their own abstraction layer.
2013/10/03 20:08:13 stopping reader
2013/10/03 20:08:13 exiting lookupdLoop
2013/10/03 20:08:43 closing incomingMessages
panic: runtime error: send on closed channel
goroutine 12 [running]:
github.com/bitly/nsq/nsq.(*Reader).readLoop(0xf8400a9160, 0xf8400f7000, 0x0, 0x0)
/usr/src/redhat/BUILD/bitly-nsq-0.2.22-15/src/github.com/bitly/nsq/nsq/reader.go:636 +0xd4d
created by github.com/bitly/nsq/nsq.(*Reader).ConnectToNSQ
/usr/src/redhat/BUILD/bitly-nsq-0.2.22-15/src/github.com/bitly/nsq/nsq/reader.go:565 +0x16c6
18:15:13 [aggregate]: finished 250 - 99th: 186.30ms - 95th: 184.76ms - avg: 184.38ms
18:19:23 [aggregate]: finished 250 - 99th: 187.25ms - 95th: 185.06ms - avg: 184.42ms
18:23:33 [aggregate]: finished 250 - 99th: 185.87ms - 95th: 184.81ms - avg: 184.38ms
18:28:38 ERR 2 (this.is.a.cname:4050) IO error - write tcp 123.45.67.890:4050: i/o timeout
18:28:38 ERR 2 (this.is.a.cname:4050) sending command - write tcp 123.45.67.890:4050: i/o timeout
18:28:38 ERR 2 (this.is.a.cname:4050) IO error - write tcp 123.45.67.890:4050: i/o timeout
18:28:38 ERR 2 (this.is.a.cname:4050) sending command - write tcp 123.45.67.890:4050: i/o timeout
18:28:38 ERR 2 (this.is.a.cname:4050) IO error - EOF
18:28:38 INF 2 (this.is.a.cname:4050) beginning close
18:28:38 INF 2 (this.is.a.cname:4050) readLoop exiting
18:28:38 INF 2 (this.is.a.cname:4050) breaking out of writeLoop
18:28:38 INF 2 (this.is.a.cname:4050) writeLoop exiting
18:28:38 INF 2 (this.is.a.cname:4050) finished draining, cleanup exiting
18:28:38 INF 2 (this.is.a.cname:4050) clean close complete
18:28:38 WRN 1 [my-logs/nsq_to_nsq] backing off for 2.0000 seconds (backoff level 1), setting all to RDY 0
18:28:38 [aggregate]: finished 250 - 99th: 60153.95ms - 95th: 6481.75ms - avg: 2178.38ms
18:28:40 WRN 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) backoff timeout expired, sending RDY 1
18:29:38 ERR 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) IO error - read tcp 127.0.0.1:4050: i/o timeout
18:29:38 WRN 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) delaying close, 24 outstanding messages
18:29:38 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) readLoop exiting
18:30:03 ERR 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) IO error - write tcp 127.0.0.1:4050: connection reset by peer
18:30:03 ERR 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) error sending RDY 1 - write tcp 127.0.0.1:4050: connection reset by peer
18:30:03 WRN 1 [my-logs/nsq_to_nsq] backing off for 4.0000 seconds (backoff level 2), setting all to RDY 0
18:30:03 ERR 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) IO error - write tcp 127.0.0.1:4050: connection reset by peer
18:30:03 ERR 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) error sending command REQ 0811ec9736c5d000 90000 - write tcp 127.0.0.1:4050: connection reset by peer
18:30:03 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) beginning close
18:30:03 ERR 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) IO error - write tcp 127.0.0.1:4050: connection reset by peer
18:30:03 ERR 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) error sending command REQ 0811ec983085d000 90000 - write tcp 127.0.0.1:4050: connection reset by peer
18:30:03 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) breaking out of writeLoop
18:30:03 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) writeLoop exiting
18:30:03 INF 2 exiting router
18:30:03 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) finished draining, cleanup exiting
18:30:03 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) clean close complete
18:30:03 WRN 1 [my-logs/nsq_to_nsq] there are 0 connections left alive
18:30:03 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) re-connecting in 15 seconds...
18:30:18 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) connecting to nsqd
18:30:18 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) upgrading to TLS
19:01:43 WRN 1 [my-logs/nsq_to_nsq] backing off for 4.0000 seconds (backoff level 2), setting all to RDY 0
19:01:47 WRN 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) backoff timeout expired, sending RDY 1
19:01:47 INF 2 (this.is.a.cname:4050) connecting to nsqd
19:01:48 ERR 2 (this.is.a.cname:4050) error connecting to nsqd - dial tcp 123.45.67.890:4050: i/o timeout
19:01:48 ERR 1 [my-logs/nsq_to_nsq] Handler returned error (dial tcp 123.45.67.890:4050: i/o timeout) for msg 0811f33e490d7000
19:01:48 WRN 1 [my-logs/nsq_to_nsq] backing off for 8.0000 seconds (backoff level 3), setting all to RDY 0
19:01:56 WRN 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) backoff timeout expired, sending RDY 1
19:01:56 INF 2 (this.is.a.cname:4050) connecting to nsqd
19:02:00 INF 2 (this.is.a.cname:4050) upgrading to TLS
19:02:05 ERR 2 (this.is.a.cname:4050) error connecting to nsqd - failed to IDENTIFY - tls: oversized record received with length 19968
19:02:05 ERR 1 [my-logs/nsq_to_nsq] Handler returned error (failed to IDENTIFY - tls: oversized record received with length 19968) for msg 0811f33f428d7000
19:02:05 WRN 1 [my-logs/nsq_to_nsq] backing off for 15.0000 seconds (backoff level 4), setting all to RDY 0
19:02:20 WRN 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) backoff timeout expired, sending RDY 1
19:02:20 INF 2 (this.is.a.cname:4050) connecting to nsqd
19:03:06 INF 2 (this.is.a.cname:4050) upgrading to TLS
19:03:06 ERR 2 (this.is.a.cname:4050) error connecting to nsqd - failed to IDENTIFY - tls: oversized record received with length 19968
19:03:06 ERR 1 [my-logs/nsq_to_nsq] Handler returned error (failed to IDENTIFY - tls: oversized record received with length 19968) for msg 0811f3403d0d7000
19:03:06 WRN 1 [my-logs/nsq_to_nsq] backing off for 15.0000 seconds (backoff level 4), setting all to RDY 0
19:03:21 WRN 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) backoff timeout expired, sending RDY 1
19:03:21 INF 2 (this.is.a.cname:4050) connecting to nsqd
19:04:10 INF 2 (this.is.a.cname:4050) upgrading to TLS
19:04:10 ERR 2 (this.is.a.cname:4050) error connecting to nsqd - failed to IDENTIFY - tls: oversized record received with length 19968
19:04:10 ERR 1 [my-logs/nsq_to_nsq] Handler returned error (failed to IDENTIFY - tls: oversized record received with length 19968) for msg 0811f341364d7000
19:04:10 WRN 1 [my-logs/nsq_to_nsq] backing off for 15.0000 seconds (backoff level 4), setting all to RDY 0
19:04:25 WRN 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) backoff timeout expired, sending RDY 1
19:04:25 INF 2 (this.is.a.cname:4050) connecting to nsqd
19:04:25 INF 2 (this.is.a.cname:4050) upgrading to TLS
19:04:26 WRN 1 [my-logs/nsq_to_nsq] backing off for 8.0000 seconds (backoff level 3), setting all to RDY 0
19:04:34 WRN 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) backoff timeout expired, sending RDY 1
19:04:34 WRN 1 [my-logs/nsq_to_nsq] backing off for 4.0000 seconds (backoff level 2), setting all to RDY 0
19:06:30 ERR 2 (this.is.a.cname:4050) IO error - EOF
19:06:30 INF 2 (this.is.a.cname:4050) beginning close
19:06:30 INF 2 (this.is.a.cname:4050) readLoop exiting
19:06:30 INF 2 (this.is.a.cname:4050) breaking out of writeLoop
19:06:30 INF 2 (this.is.a.cname:4050) writeLoop exiting
19:06:30 ERR 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) IO error - EOF
19:06:30 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) beginning close
19:06:30 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) readLoop exiting
19:06:30 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) breaking out of writeLoop
19:06:30 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) writeLoop exiting
19:06:30 WRN 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) backoff timeout expired, sending RDY 1
19:06:30 INF 2 (this.is.a.cname:4050) finished draining, cleanup exiting
19:06:30 INF 2 (this.is.a.cname:4050) clean close complete
19:06:30 INF 2 exiting router
19:06:30 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) finished draining, cleanup exiting
19:06:30 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) clean close complete
19:06:30 WRN 1 [my-logs/nsq_to_nsq] there are 0 connections left alive
19:06:30 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) re-connecting in 15 seconds...
19:06:45 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) connecting to nsqd
19:06:45 INF 1 [my-logs/nsq_to_nsq] (127.0.0.1:4050) upgrading to TLS
After both, at the end, where it says "upgrading to TLS", the producer side was also alive. I fixed by restarting nsq_to_nsq, which spammed finished 250
logs, aka, it was down for hours.
My server started a conusmer to consume messages, then I stop it for 3 days, then I start my server again, the old message make no sense to me (cause I use nsq for realtime notification), I just want to consume the new messages, how to solve it without consuming in new channel?
Hi,
I have started 2 nsqd, 1nsqlookupd and 1 nsqadmin.
When stopping 2 nsqd and 1 nsqlookupd, and starting up again, the nsqadmin does not seem to recognise or for some other reason he cant see other instances are back up.
It also the same for when stopping 2 nsqd, 1 nsqlookupd and 1 nsqadmin and starting up again, The nsqadmin does not seem to recognise or for some other reason he cant see other instances are back up.
I'm not sure what to do, does anyone knows about something that can be related ?
note: the nsqd instances seem to work just fine, the whole system seem to be functional but nsqadmin does not seem to understand and adapt to the new clients.
Thank you
Using go-nsq, If I have 100 consumers in the same process(A), they have different topics. One nsqd producer(B) have the topics data update for these consumers, they will get data from the same producer. Now, how many TCP connections will be started between A and B? Wheather consumers will multiplex one TCP to the same producer? Thanks a lot.
Hi, guys.
While I'm reading the source in api_request.go
, I found that it use "simplejson" package to unmarshal json in the method called apiRequestNegotiateV1
.
Why not just define a struct and pass a pointer which address to it into json.Unmarshal
.
I think that may improve the readability of the code and may not affect the performance too much.
Hi,
I didn't update go-nsq for a long time I must say, maybe 3 weeks, but I just did install for program on a ubuntu server (thus every dependency has been checked out), and I get this weird error that I never saw before. As you can see, the error isn't really explicit.
I first thought it was coming from my code, and it may be still, but as I can't find why it would do that and as go-nsq appears a lot in there, I prefer to post it here hoping that anyone would have an idea where it comes from.
It seems to appear only when there's concurrent nsq access (producing several messages and receiving several messages at same time)
unexpected fault address 0x7f9000000011
fatal error: fault
[signal 0xb code=0x1 addr=0x7f9000000011]
goroutine 2087 [running]:
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
:0
goroutine 1 [select]:
main.main
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:178
goroutine 8 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178
goroutine 9 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048
goroutine 140 [select]:
craft.getDataFromDatas
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:102
github.com_testar_test_mount_of_doom_craft.GetTestData
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:141
main.craftTestID
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft_test.go:232
main.handleCraftTestMessage
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:95
main.$nested0
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:122
created by main.handlePackage
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:121
goroutine 13 [IO wait]:
github.com_bitly_go_nsq.Read.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:238
github.com_bitly_go_nsq.ReadResponse
/home/ubuntu/go/src/github.com/bitly/go-nsq/protocol.go:54
github.com_bitly_go_nsq.ReadUnpackedResponse
/home/ubuntu/go/src/github.com/bitly/go-nsq/protocol.go:91
github.com_bitly_go_nsq.readLoop.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:475
created by github.com_bitly_go_nsq.Connect.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:176
goroutine 14 [select]:
github.com_bitly_go_nsq.writeLoop.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:541
created by github.com_bitly_go_nsq.Connect.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:177
goroutine 15 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324
goroutine 16 [chan receive]:
qlib.handleInternal
/home/ubuntu/go/src/github.com/testar-test/go-qlib/client.go:230
created by github.com_testar_test_go_qlib.run.pN38_github.com_testar_test_go_qlib.Client
/home/ubuntu/go/src/github.com/testar-test/go-qlib/client.go:224
goroutine 17 [IO wait]:
github.com_bitly_go_nsq.Read.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:238
github.com_bitly_go_nsq.ReadResponse
/home/ubuntu/go/src/github.com/bitly/go-nsq/protocol.go:54
github.com_bitly_go_nsq.ReadUnpackedResponse
/home/ubuntu/go/src/github.com/bitly/go-nsq/protocol.go:91
github.com_bitly_go_nsq.readLoop.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:475
created by github.com_bitly_go_nsq.Connect.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:176
goroutine 18 [select]:
github.com_bitly_go_nsq.writeLoop.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:541
created by github.com_bitly_go_nsq.Connect.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:177
goroutine 19 [select]:
github.com_bitly_go_nsq.router.pN32_github.com_bitly_go_nsq.Producer
/home/ubuntu/go/src/github.com/bitly/go-nsq/producer.go:261
created by github.com_bitly_go_nsq.connect.pN32_github.com_bitly_go_nsq.Producer
/home/ubuntu/go/src/github.com/bitly/go-nsq/producer.go:241
goroutine 20 [IO wait]:
github.com_gocql_gocql.recv.pN27_github.com_gocql_gocql.Conn
/home/ubuntu/go/src/github.com/gocql/gocql/conn.go:262
github.com_gocql_gocql.serve.pN27_github.com_gocql_gocql.Conn
/home/ubuntu/go/src/github.com/gocql/gocql/conn.go:240
created by github.com_gocql_gocql.Connect
/home/ubuntu/go/src/github.com/gocql/gocql/conn.go:174
goroutine 22 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178
goroutine 23 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048
goroutine 28 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324
goroutine 29 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178
goroutine 30 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048
goroutine 34 [IO wait]:
github.com_bitly_go_nsq.Read.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:238
github.com_bitly_go_nsq.ReadResponse
/home/ubuntu/go/src/github.com/bitly/go-nsq/protocol.go:54
github.com_bitly_go_nsq.ReadUnpackedResponse
/home/ubuntu/go/src/github.com/bitly/go-nsq/protocol.go:91
github.com_bitly_go_nsq.readLoop.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:475
created by github.com_bitly_go_nsq.Connect.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:176
goroutine 35 [select]:
github.com_bitly_go_nsq.writeLoop.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:541
created by github.com_bitly_go_nsq.Connect.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:177
goroutine 36 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324
goroutine 37 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178
goroutine 38 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048
goroutine 42 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324
goroutine 43 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178
goroutine 44 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048
goroutine 48 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324
goroutine 49 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178
goroutine 50 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048
goroutine 54 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324
goroutine 132 [select]:
craft.getDataFromDatas
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:102
github.com_testar_test_mount_of_doom_craft.GetTestData
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:141
main.craftTestID
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft_test.go:232
main.handleCraftTestMessage
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:95
main.$nested0
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:122
created by main.handlePackage
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:121
goroutine 55 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178
goroutine 56 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048
goroutine 60 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324
goroutine 61 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178
goroutine 62 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048
goroutine 66 [IO wait]:
github.com_bitly_go_nsq.Read.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:238
github.com_bitly_go_nsq.ReadResponse
/home/ubuntu/go/src/github.com/bitly/go-nsq/protocol.go:54
github.com_bitly_go_nsq.ReadUnpackedResponse
/home/ubuntu/go/src/github.com/bitly/go-nsq/protocol.go:91
github.com_bitly_go_nsq.readLoop.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:475
created by github.com_bitly_go_nsq.Connect.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:176
goroutine 67 [select]:
github.com_bitly_go_nsq.writeLoop.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:541
created by github.com_bitly_go_nsq.Connect.pN28_github.com_bitly_go_nsq.Conn
/home/ubuntu/go/src/github.com/bitly/go-nsq/conn.go:177
goroutine 68 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324
goroutine 69 [select]:
github.com_bitly_go_nsq.rdyLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:861
created by github.com_bitly_go_nsq.NewConsumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:178
goroutine 70 [chan receive]:
github.com_bitly_go_nsq.handlerLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1056
created by github.com_bitly_go_nsq.AddConcurrentHandlers.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:1048
goroutine 74 [select]:
github.com_bitly_go_nsq.lookupdLoop.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:377
created by github.com_bitly_go_nsq.ConnectToNSQLookupd.pN32_github.com_bitly_go_nsq.Consumer
/home/ubuntu/go/src/github.com/bitly/go-nsq/consumer.go:324
goroutine 75 [select]:
github.com_testar_test_mount_of_doom_craft.HandleNewMarkers
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/marker.go:145
created by main.main
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:175
goroutine 76 [IO wait]:
github.com_gocql_gocql.recv.pN27_github.com_gocql_gocql.Conn
/home/ubuntu/go/src/github.com/gocql/gocql/conn.go:262
github.com_gocql_gocql.serve.pN27_github.com_gocql_gocql.Conn
/home/ubuntu/go/src/github.com/gocql/gocql/conn.go:240
created by github.com_gocql_gocql.Connect
/home/ubuntu/go/src/github.com/gocql/gocql/conn.go:174
goroutine 151 [select]:
craft.getDataFromDatas
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:102
github.com_testar_test_mount_of_doom_craft.GetTestData
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:141
main.craftTestID
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft_test.go:232
main.handleCraftTestMessage
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:95
main.$nested0
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:122
created by main.handlePackage
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:121
goroutine 184 [select]:
craft.getDataFromDatas
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:102
github.com_testar_test_mount_of_doom_craft.GetTestData
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:141
main.craftTestID
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft_test.go:232
main.handleCraftTestMessage
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:95
main.$nested0
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:122
created by main.handlePackage
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:121
goroutine 2033 [select]:
github.com_AdRoll_goamz_s3.doHttpRequest.pN29_github.com_AdRoll_goamz_s3.S3
/home/ubuntu/go/src/github.com/AdRoll/goamz/s3/s3.go:1144
github.com_AdRoll_goamz_s3.run.pN29_github.com_AdRoll_goamz_s3.S3
/home/ubuntu/go/src/github.com/AdRoll/goamz/s3/s3.go:1180
github.com_AdRoll_goamz_s3.query.pN29_github.com_AdRoll_goamz_s3.S3
/home/ubuntu/go/src/github.com/AdRoll/goamz/s3/s3.go:931
github.com_AdRoll_goamz_s3.PutReader.pN33_github.com_AdRoll_goamz_s3.Bucket
/home/ubuntu/go/src/github.com/AdRoll/goamz/s3/s3.go:374
github.com_AdRoll_goamz_s3.Put.pN33_github.com_AdRoll_goamz_s3.Bucket
/home/ubuntu/go/src/github.com/AdRoll/goamz/s3/s3.go:334
craft.putMarkerOnS3
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/marker.go:156
craft.downloadMarker
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/marker.go:89
created by github.com_testar_test_mount_of_doom_craft.HandleNewMarkers
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/marker.go:147
goroutine 2040 [select]:
goroutine 2039 [IO wait]:
goroutine 419 [select]:
craft.getDataFromDatas
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:102
github.com_testar_test_mount_of_doom_craft.GetTestData
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft/craft_test_data.go:141
main.craftTestID
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/craft_test.go:232
main.handleCraftTestMessage
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:95
main.$nested0
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:122
created by main.handlePackage
/home/ubuntu/go/src/github.com/testar-test/mount-of-doom/main.go:121
The go-nsq client is unstable sometimes. When the server side meet an io error and close the connection, but the go-nsq client will still wait there indefinitely.
An related log except from the nsqd((i modified the log a litte bit, for debugging)):
[nsqd] 2015/08/13 18:30:14.640803 [127.0.0.1:27884] state rdy: 2000 inflt: 1998
[nsqd] 2015/08/13 18:30:14.640902 PROTOCOL(V2): writing msg(08c483c906801002) to client(127.0.0.1:27884) - {"type":"OutgoingMessageWrapper","track_id":"50217923020965941","enterprise_code":"yto_dev","employee_code":"00079738","from":"router.2","send_at":1439461814,"data_type":"WsOutMessage","data_inner_type":"notify.wait","data":"eyJpZCI6IjUwMjE3OTIzMDIwOTY1OTQxIiwidHlwZSI6Im5vdGlmeS53YWl0IiwiZGF0YSI6eyJjb3VudCI6MCwibm9kZV9jb2RlIjoiIn19","subscriber_id":0}
[nsqd] 2015/08/13 18:30:14.640944 [127.0.0.1:27884] state rdy: 2000 inflt: 1999
[nsqd] 2015/08/13 18:30:14.643325 PROTOCOL(V2): writing msg(08c483c907401000) to client(127.0.0.1:27884) - {"type":"OutgoingMessageWrapper","track_id":"50217923037415477","enterprise_code":"yto_dev","employee_code":"00079738","from":"router.1","send_at":1439461814,"data_type":"WsOutMessage","data_inner_type":"notify.servicecount","data":"eyJpZCI6IjUwMjE3OTIzMDM3NDE1NDc3IiwidHlwZSI6Im5vdGlmeS5zZXJ2aWNlY291bnQiLCJkYXRhIjp7ImNvdW50IjowfX0=","subscriber_id":0}
[nsqd] 2015/08/13 18:30:14.643358 [127.0.0.1:27884] state rdy: 2000 inflt: 2000
[nsqd] 2015/08/13 18:30:14.820049 [127.0.0.1:27884] state rdy: 2000 inflt: 2000
[nsqd] 2015/08/13 18:30:14.867511 PROTOCOL(V2): [127.0.0.1:27884] exiting ioloop
[nsqd] 2015/08/13 18:30:14.867732 ERROR: client(127.0.0.1:27884) - failed to read command - read tcp 127.0.0.1:27884: i/o timeout
[nsqd] 2015/08/13 18:30:14.867778 PROTOCOL(V2): [127.0.0.1:27884] exiting messagePump
An related log except from the go-nsq(i modified the log a litte bit, for debugging):
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG 4 [connector.1/default] (kefu-dev.hotpu.cn:4150) writeLoop, cmdChan: 0, msgResponseChan: 0"}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG 4 [connector.1/default] (mysite:4150) on msgResponseChan chan"}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG 4 [connector.1/default] (mysite:4150) FIN 08c483c630001004"}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e INF 4 [connector.1/default] (mysite:4150) WriteCommand2 FIN 08c483c630001004..."}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG 4 [connector.1/default] (mysite:4150) writeLoop, cmdChan: 0, msgResponseChan: 0"}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG 4 [connector.1/default] (mysite:4150) on msgResponseChan chan"}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG 4 [connector.1/default] (mysite:4150) FIN 08c483c630801000"}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e INF 4 [connector.1/default] (mysite:4150) WriteCommand2 FIN 08c483c630801000..."}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG 4 [connector.1/default] (mysite:4150) writeLoop, cmdChan: 0, msgResponseChan: 0"}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG 4 [connector.1/default] (mysite:4150) skip sending RDY 2000 (675 remain out of last RDY 2000)"}
{"@timestamp":"2015-08-13T18:30:12+08:00","@version":1,"level":"debug","message":"NSQL \u003e DBG 4 [connector.1/default] (mysite:4150) skip sending RDY 2000 (674 remain out of last RDY 2000)"}
I find i many of the cases, the go-nsq could detect there is a IO error and reconnect, but in a few cases, the go-nsq will hang indefinitely like the logs show above.
How could I let the go-nsq client detect the io error early and trigger an reconnect?
And BTW, here is the config i am using:
if err := nsqConfig.Set("dial_timeout", "5s"); err != nil {
return err
}
if err := nsqConfig.Set("read_timeout", "2s"); err != nil {
return err
}
if err := nsqConfig.Set("heartbeat_interval", "1s"); err != nil {
return err
}
if err := nsqConfig.Set("write_timeout", "1s"); err != nil {
return err
}
if err := nsqConfig.Set("lookupd_poll_interval", "2s"); err != nil {
return err
}
if err := nsqConfig.Set("deflate", "true"); err != nil {
return err
}
if err := nsqConfig.Set("output_buffer_timeout", "250ms"); err != nil {
return err
}
if err := nsqConfig.Set("max_in_flight", 2000); err != nil {
return err
}
if err := nsqConfig.Set("msg_timeout", "2s"); err != nil {
return err
}
Right now it's just closing the connection, causing spurious errors in nsqd
logs (see nsqio/nsq#521)
It should send a CLS
command and wait for the appropriate response before closing the connection, like Consumer
already does.
cc @tj
Currently, there are reader-opt tls options, but not writer opts.
Adding writer-opt tls would enable nsqd->nsq_to_nsq->nsqd TLS communication, avoiding the http overhead of nsqd->nsq_to_http->nsqd.
I pulled to latest go-nsq, and find that when running test.sh, the TestProducerConnection fails. I'm on OSX 10.10.2, go 1.5.1, nsqd v0.3.6-alpha (built w/go1.5.1).
=== RUN TestProducerConnection
--- FAIL: TestProducerConnection (0.00s)
producer_test.go:54: should lazily connect - dial tcp 127.0.0.2:0->127.0.0.1:4150: bind: can't assign requested address
To mirror nsqio/nsq#518, which changes -tls-min-version from the format "tls10" to "tls.10", and which also set tls max version to get TLS_FALLBACK_SCSV
I am running into a timeout issue with producers.
2015/02/15 18:38:48 INF 1 (x.x.x.x:14150) connecting to nsqd
2015/02/15 18:38:49 ERR 1 (x.x.x.x:14150) error connecting to nsqd - dial tcp x.x.x.x:14150: i/o timeout
Let me try and explain my setup:
I am running a NSQ cluster on Google Compute Engine using Container Engine. (Configuration is about the same as this setup: https://gist.github.com/smreed/e93fd74765caef99266a. Only thing that I have changed is that I setup loadbalancers for the nsq daemon.)
I wrote a helper to handle my producer logic ( See: https://gist.github.com/mickelsonm/8ab51cf6171f6aa3100d)
I set the environment variable to the load balancer:
NSQ_DAEMON_HOSTS= x.x.x.x:14150
Reader
and Writer
do not currently support:
heartbeat_interval
output_buffer_size
output_buffer_timeout
Additionally, make sure to document the tradeoffs of output_buffer_timeout
as seen in nsqio/nsq@c0cac78
investigating the broken pipe issue but the logs quickly fill up with thousands of these lines while it slowly waits for the remaining work. Is it possible to lower the log interval? Maybe just tick every second and/or when the in-flight is reduced
red-files 2014/10/30 15:52:10 ERR 1 [red-files/red-extract] (:5001) IO error - write tcp 127.0.0.1:5001: broken pipe
red-files 2014/10/30 15:52:10 ERR 1 [red-files/red-extract] (:5001) error sending command FIN 07526046660c5002 - write tcp 127.0.0.1:5001: broken pipe
red-files 2014/10/30 15:52:10 INF 1 [red-files/red-extract] (:5001) beginning close
red-files 2014/10/30 15:52:10 INF 1 [red-files/red-extract] (:5001) breaking out of writeLoop
red-files 2014/10/30 15:52:10 INF 1 [red-files/red-extract] (:5001) writeLoop exiting
red-files 2014/10/30 15:52:11 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 31 messages in flight
red-files 2014/10/30 15:52:11 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 31 messages in flight
red-files 2014/10/30 15:52:11 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 31 messages in flight
red-files 2014/10/30 15:52:11 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 31 messages in flight
2014/10/30 15:52:11 stats:
2014/10/30 15:52:11 events.published 2419.11/s tick=24,192 total=985,360
2014/10/30 15:52:11 files.processed 0.20/s tick=2 total=2,142
red-files 2014/10/30 15:52:11 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 31 messages in flight
red-files 2014/10/30 15:52:11 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 31 messages in flight
red-files 2014/10/30 15:52:11 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 31 messages in flight
red-files 2014/10/30 15:52:11 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 30 messages in flight
red-files 2014/10/30 15:52:11 WRN 1 [red-files/red-extract] (:5001) delaying close, 30 outstanding messages
red-files 2014/10/30 15:52:11 INF 1 [red-files/red-extract] (:5001) readLoop exiting
red-files 2014/10/30 15:52:11 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 30 messages in flight
red-files 2014/10/30 15:52:11 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 30 messages in flight
red-files 2014/10/30 15:52:11 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:11 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:12 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:13 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
red-files 2014/10/30 15:52:13 WRN 1 [red-files/red-extract] (:5001) draining... waiting for 29 messages in flight
In #95 we documented a case where nsq_to_nsq
(which uses both Consumer
and Producer
) would panic due to a bug in the exit timeout code path.
The bug is exacerbated by the fact that in rare cases the producer could block serially for 1s
per PUB
if the remote address is black holing the connection attempts.
An alternative strategy might be to back off connection attempts and return errors instantly during backoff windows.
Thoughts?
what's the reference of the in-flight value?
nsq.Identify
is way out of date in terms of supported options
I ran into an issue where publishing messages to NSQ would eventually result in too many open files. I had code in a function that would publish a message as so:
func enqueue(queue string, message []byte) error {
producer, err := nsq.NewProducer("...", config)
if err != nil {
return err
}
defer producer.Stop()
err = producer.Publish(queue, message)
return err
}
Everytime I called that function a new file descriptor (type = sock when using lsof to monitor things) was being created, that would not go away.
I moved the creation of a the producer
outside the function, so the same instance would be used each time a message is published, which has "fixed" the issue. My question, I thought defer producer.Stop()
would "cleanup" and thus not cause too many open files. Thoughts?
I didn't realize the message handler FINs / REQs for you, but it feels a little awkward for async stuff. I have to spawn a goroutine in there and then have a channel to feed errors back, for example this is what I originally had:
func (w *Worker) HandleMessage(msg *nsq.Message) error {
go func() {
_, err := w.Write(msg.Body)
if err != nil {
log.Printf("Error: %s (REQ %s)", err, msg.ID)
msg.Requeue(1 * time.Minute)
return
}
msg.Finish()
}()
return nil
}
Thenn I realized I was getting way too many messages haha. It would be awesome to have access to the message channel (if it makes sense internally, not sure). Delegating FIN / REQ is clearer to me at least than tweeking REQ values at the config-level and doing this handle message stuff.
What do you guys think?
Oddly I wasn't getting any errors for double-FINs :s not sure what's up there
As title. And I cannot understand this code snippet: https://github.com/nsqio/go-nsq/blob/master/producer.go#L318-L321, is first send command first get reponse or error, otherwise how to popTransaction guarantee data is poped transaction's response. thanks ~
this picks up the logging improvements to include topic/channel
and superceded #3 (cherry-picked commit)
It's not obvious from go-doc documentation what the default config values are, or what time some are, and there is no way to introspect them at runtime. Both seem like they should exist.
AddConcurrentHandlers starts X handlers concurrently and has those X constantly all check one channel if there is a message to handle.
It seems better to have one loop checking a channel that that, if a message exists, grabs one spot from a semaphore and fires off a go handleMessage
.
I use go-nsq to consum msg. The HandlerMessage will return nil or err if needed. I am no sure where is the problem, please help.
this is my step:
1: set MaxAttmepts = 2; ConnectToNSQLookupds is (127.0.0.2:4161 and 127.0.0.1:4161); 127.0.0.1 is real nsqlookupd, 127.0.0.2 is nothing
2: produce 1 msg, HandlerMessage return nil, everything is ok
3: produce 1 msg, HandlerMessage return err, everything is ok
4: After MaxAttempts, I kill nsqlookupd and nsqd. Restart nsqlookupd and nsqdm
5: produce 1 msg, consumer can not get any msg. I check the admin-web and msg are saved in nsq.
if handler return nil, everything is ok.
this is debug log:
2015/04/07 18:02:22 INF 1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:02:22 DBG 1 [test/ch] starting Handler
2015/04/07 18:02:22 INF 1 [test/ch] (zhangdl-pc:4150) connecting to nsqd
2015/04/07 18:02:22 DBG 1 [test/ch] (zhangdl-pc:4150) IDENTIFY response: &{MaxRdyCount:2500 TLSv1:false Deflate:false Snappy:false AuthRequired:false}
2015/04/07 18:02:22 DBG 1 [test/ch] (zhangdl-pc:4150) sending RDY 1 (0 remain from last RDY 0)
2015/04/07 18:02:22 DBG 1 [test/ch] (zhangdl-pc:4150) sending RDY 1 (0 remain from last RDY 1)
2015/04/07 18:02:22 DBG 1 [test/ch] (zhangdl-pc:4150) FIN 081fb125edf2e000
2015/04/07 18:02:47 DBG 1 [test/ch] (zhangdl-pc:4150) sending RDY 1 (0 remain from last RDY 1)
2015/04/07 18:02:47 ERR 1 [test/ch] Handler returned error (tsdb retrun err code: 400) for msg 081fb125edf2e001
2015/04/07 18:02:47 DBG 1 [test/ch] (zhangdl-pc:4150) REQ 081fb125edf2e001
2015/04/07 18:02:47 WRN 1 [test/ch] backing off for 2.0000 seconds (backoff level 1), setting all to RDY 0
2015/04/07 18:02:49 WRN 1 [test/ch] (zhangdl-pc:4150) backoff timeout expired, sending RDY 1
2015/04/07 18:02:52 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:03:22 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:03:25 INF 1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:03:52 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:04:17 ERR 1 [test/ch] Handler returned error (tsdb retrun err code: 400) for msg 081fb125edf2e001
2015/04/07 18:04:17 DBG 1 [test/ch] (zhangdl-pc:4150) REQ 081fb125edf2e001
2015/04/07 18:04:17 WRN 1 [test/ch] backing off for 4.0000 seconds (backoff level 2), setting all to RDY 0
2015/04/07 18:04:21 WRN 1 [test/ch] (zhangdl-pc:4150) backoff timeout expired, sending RDY 1
2015/04/07 18:04:22 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:04:25 INF 1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:04:52 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:05:22 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:05:25 INF 1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:05:52 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:06:22 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:06:25 INF 1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:06:52 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:07:17 WRN 1 [test/ch] msg 081fb125edf2e001 attempted 3 times, giving up
2015/04/07 18:07:17 DBG 1 [test/ch] (zhangdl-pc:4150) FIN 081fb125edf2e001
2015/04/07 18:07:17 WRN 1 [test/ch] backing off for 2.0000 seconds (backoff level 1), setting all to RDY 0
2015/04/07 18:07:19 WRN 1 [test/ch] (zhangdl-pc:4150) backoff timeout expired, sending RDY 1
2015/04/07 18:07:22 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:07:25 INF 1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:07:52 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:08:22 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:08:25 INF 1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:08:27 ERR 1 [test/ch] (zhangdl-pc:4150) IO error - EOF
2015/04/07 18:08:27 INF 1 [test/ch] (zhangdl-pc:4150) beginning close
2015/04/07 18:08:27 INF 1 [test/ch] (zhangdl-pc:4150) readLoop exiting
2015/04/07 18:08:27 INF 1 [test/ch] (zhangdl-pc:4150) breaking out of writeLoop
2015/04/07 18:08:27 INF 1 [test/ch] (zhangdl-pc:4150) writeLoop exiting
2015/04/07 18:08:27 INF 1 [test/ch] (zhangdl-pc:4150) finished draining, cleanup exiting
2015/04/07 18:08:27 INF 1 [test/ch] (zhangdl-pc:4150) clean close complete
2015/04/07 18:08:27 WRN 1 [test/ch] there are 0 connections left alive
2015/04/07 18:08:27 INF 1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:08:27 ERR 1 [test/ch] error querying nsqlookupd (http://127.0.0.2:4161/lookup?topic=test) - Get http://127.0.0.2:4161/lookup?topic=test: dial tcp 127.0.0.2:4161: connection refused
2015/04/07 18:09:25 INF 1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:09:25 INF 1 [test/ch] (zhangdl-pc:4150) connecting to nsqd
2015/04/07 18:09:25 DBG 1 [test/ch] (zhangdl-pc:4150) IDENTIFY response: &{MaxRdyCount:2500 TLSv1:false Deflate:false Snappy:false AuthRequired:false}
2015/04/07 18:09:55 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:10:25 INF 1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:10:25 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:10:55 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:11:25 INF 1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:11:25 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:11:55 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:12:25 INF 1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:12:25 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:12:55 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:13:25 INF 1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:13:25 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:13:55 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:14:25 INF 1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:14:25 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:14:55 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:15:25 INF 1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:15:25 DBG 1 [test/ch] (zhangdl-pc:4150) heartbeat received
this is my code:
package main
import (
"bytes"
"config"
"errors"
"flag"
"fmt"
"github.com/bitly/go-nsq"
"github.com/bitly/go-simplejson"
"io/ioutil"
"log"
"net/http"
"os"
"plog"
"strings"
//"time"
)
type MyTestHandler struct {
q *nsq.Consumer
tsdbAddr string
}
//handle msg
func (h *MyTestHandler) HandleMessage(message *nsq.Message) error {
//send to opentsdb
httpclient := &http.Client{}
a := message.Body
req, _ := http.NewRequest("POST", h.tsdbAddr, bytes.NewBuffer(a))
response, err := httpclient.Do(req)
if err != nil {
plog.Errorf(" http_to_tsdb_err, MsgId: %s, TsAddr: %s, Attempts %d, err(%s)", message.ID, h.tsdbAddr, message.Attempts, err)
return err
}
defer response.Body.Close()
//check tsdb resp
code := response.StatusCode
if code != 204 && code != 200 {
body, _ := ioutil.ReadAll(response.Body)
respdata, err := simplejson.NewJson(body)
if err != nil {
plog.Errorf(" tsdb_resp_json_err, MsgId: %s, TsAddr: %s, Attempts %d, err(%s)", message.ID, h.tsdbAddr, message.Attempts, err)
return err
}
respMessage, _ := respdata.Get("error").Get("message").String()
respDetails, _ := respdata.Get("error").Get("details").String()
err = errors.New(fmt.Sprintf("tsdb retrun err code: %d", code))
plog.Errorf(" tsdb_resp_code_err, MsgId: %s, TsAddr: %s, Attempts %d, code: %d, message: %s, details: %s", message.ID, h.tsdbAddr, message.Attempts, code, respMessage, strings.Replace(respDetails, "\n", " ", -1))
return err
}
plog.Infof(" success_to_tsdb, MsgId: %s, TsAddr: %s, Attempts %d", message.ID, h.tsdbAddr, message.Attempts)
return nil
}
func main() {
//get config
var inputConfigFile = flag.String("c", "./xx.conf", "nsqlookupd ip and port")
flag.Parse()
proxyConfig, err := config.New(*inputConfigFile)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(proxyConfig)
fmt.Println("NsqlookupAddr: ", proxyConfig.NsqlookupAddr)
fmt.Println("NsqTopic: ", proxyConfig.NsqTopic)
fmt.Println("NsqChan: ", proxyConfig.NsqChan)
fmt.Println("TsdbAddr: ", proxyConfig.TsdbAddr)
fmt.Println("AccessLog: ", proxyConfig.AccessLog)
fmt.Println("ErrorLog: ", proxyConfig.ErrorLog)
fmt.Println("GoNsqLibLog: ", proxyConfig.GoNsqLibLog)
fmt.Println("HandlerMaxRetry: ", proxyConfig.MaxAttempts)
fmt.Println("MaxFlight: ", proxyConfig.MaxFlight)
fmt.Println("HeartbeatInterval: ", proxyConfig.HeartbeatInterval)
fmt.Println("ReadTimeout: ", proxyConfig.ReadTimeout)
fmt.Println("LookupdPollInterval: ", proxyConfig.LookupdPollInterval)
fmt.Println("RequeueFlag: ", proxyConfig.RequeueFlag)
//open log
plog.StartLog(proxyConfig.AccessLog, proxyConfig.ErrorLog)
//check config
if proxyConfig.MaxAttempts <= 0 {
fmt.Printf("err: HandlerMaxRetry: %d <= 0", proxyConfig.MaxAttempts)
return
}
if proxyConfig.MaxFlight <= 0 {
fmt.Printf("err: MaxFlight: %d <= 0", proxyConfig.MaxFlight)
return
}
if proxyConfig.ReadTimeout < proxyConfig.HeartbeatInterval {
fmt.Printf("err: ReadTimeout %d < HeartbeatInterval %d", proxyConfig.ReadTimeout, proxyConfig.HeartbeatInterval)
return
}
if proxyConfig.LookupdPollInterval < proxyConfig.HeartbeatInterval {
fmt.Printf("err: LookupdPollInterval %d < HeartbeatInterval %d", proxyConfig.LookupdPollInterval, proxyConfig.HeartbeatInterval)
return
}
//set config
config := nsq.NewConfig()
config.MaxInFlight = int(proxyConfig.MaxFlight)
config.MaxAttempts = uint16(proxyConfig.MaxAttempts)
//config.HeartbeatInterval = time.Duration(proxyConfig.HeartbeatInterval * 1000000)
//config.ReadTimeout = time.Duration(proxyConfig.ReadTimeout * 1000000)
//config.LookupdPollInterval = time.Duration(proxyConfig.LookupdPollInterval * 1000000)
//consumer
q, _ := nsq.NewConsumer(proxyConfig.NsqTopic, proxyConfig.NsqChan, config)
//set nsq lib log
nsqLibLog, err_log := os.OpenFile(proxyConfig.GoNsqLibLog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err_log != nil {
fmt.Println(err_log)
return
}
q.SetLogger(log.New(nsqLibLog, "", log.Ldate|log.Ltime), 0)
fmt.Println(config)
//start
h := &MyTestHandler{
q: q,
tsdbAddr: "http://" + proxyConfig.TsdbAddr + "/api/put",
}
q.AddHandler(h)
err = q.ConnectToNSQLookupds(proxyConfig.NsqlookupAddr)
if err != nil {
plog.Error("ConnectToNSQLookupd err:", err)
os.Exit(-1)
}
<-q.StopChan
}
conf.Set("verbose", true) == &errors.errorString{s:"ERROR: invalid option verbose"}
heartbeat received
I'd like to swipe some code, but this repository lacks a license file.
Hey there,
I'm using this code to make a producer that sends messages to NSQ. I'm using
PublishAsync(), with a "responder" that basically does nothing:
var respChan chan *nsq.ProducerTransaction
func responder(respChan chan *nsq.ProducerTransaction) {
for _ = range respChan {
//Get rid of the messages or something... should probably respond, but not sure
//how to do that yet...
}
}
I'm sending as many messages as possible, calling publishAsync() in a loop reading from memory. My code maxes out the CPU, and manages about 14,000 messages per second (the messages are quite small, a msgpack'd struct containing a < 16 char string).
In doing a code profile, my code apparently spends 50.5% of its time in "ExternalCode" (no idea what that is).
I've attached my code profile:
Any idea what is going on here?
panic: runtime error: send on closed channel
goroutine 10818 [running]:
runtime.panic(0xd11480, 0x15748fe)
/usr/local/go/src/pkg/runtime/panic.c:279 +0xf5
github.com/bitly/go-nsq.(*Consumer).onConnMessage(0xc208688000, 0xc209069200, 0xc208bba000)
/root/gopath/src/mysite.com.cn/kefu/portal/Godeps/_workspace/src/github.com/bitly/go-nsq/consumer.go:632 +0x90
github.com/bitly/go-nsq.(*consumerConnDelegate).OnMessage(0xc208fa45e0, 0xc209069200, 0xc208bba000)
/root/gopath/src/mysite.com.cn/kefu/portal/Godeps/_workspace/src/github.com/bitly/go-nsq/delegates.go:111 +0x3e
github.com/bitly/go-nsq.(*Conn).readLoop(0xc209069200)
/root/gopath/src/mysite.com.cn/kefu/portal/Godeps/_workspace/src/github.com/bitly/go-nsq/conn.go:518 +0xb9b
created by github.com/bitly/go-nsq.(*Conn).Connect
/root/gopath/src/mysite.com.cn/kefu/portal/Godeps/_workspace/src/github.com/bitly/go-nsq/conn.go:181 +0x6af
Could this be a issue in the go-nsq?
just noticed this while profiling a production app.
in cpu profile:
(pprof) top30 -cum
...
(pprof) top30 -cum
5.43s of 34.68s total (15.66%)
Dropped 447 nodes (cum <= 0.17s)
Showing top 30 nodes out of 197 (cum >= 2.47s)
flat flat% sum% cum cum%
0.23s 0.66% 0.66% 27.29s 78.69% runtime.aeshashbody
0 0% 0.66% 12.52s 36.10% github.com/nsqio/go-nsq.(*Consumer).redistributeRDY
0.15s 0.43% 1.10% 12.27s 35.38% main.(*Handler).HandleMessage
0.12s 0.35% 1.44% 7.35s 21.19% fmt.(*buffer).WriteRune
0.11s 0.32% 1.76% 6.14s 17.70% type..eq.struct { lock runtime.mutex; lockOwner *runtime.g; enabled bool; shutdown bool; headerWritten bool; footerWritten bool; shutdownSema uint32; seqStart uint64; ticksStart int64; ticksEnd int64; timeStart int64; timeEnd int64; reading *runtime.traceBuf; empty *runtime.traceBuf; fullHead *runtime.traceBuf; fullTail *runtime.traceBuf; reader *runtime.g; stackTab runtime.traceStackTable; bufLock runtime.mutex; buf *runtime.traceBuf }
0.03s 0.087% 1.85% 6s 17.30% runtime.schedule
0.22s 0.63% 2.48% 5.98s 17.24% github.com/raintank/raintank-metric/metricdef.GetMetrics
0.08s 0.23% 2.71% 5.81s 16.75% runtime.injectglist
0.54s 1.56% 4.27% 5.73s 16.52% fmt.(*pp).doPrintf
0.13s 0.37% 4.64% 4.92s 14.19% type..hash.[15]runtime.dbgVar
0.05s 0.14% 4.79% 4.78s 13.78% main.(*cassandraStore).processWriteQueue
0.40s 1.15% 5.94% 4.68s 13.49% main.(*AggMetric).Add
0.05s 0.14% 6.08% 4.52s 13.03% github.com/grafana/grafana/pkg/log.(*FileLogWriter).DoRotate
0.03s 0.087% 6.17% 4.47s 12.89% github.com/grafana/grafana/pkg/log.(*Logger).StartLogger
0.51s 1.47% 7.64% 4.13s 11.91% runtime.findrunnable
0.23s 0.66% 8.30% 4.08s 11.76% fmt.(*pp).handleMethods
0.07s 0.2% 8.51% 3.70s 10.67% main.(*DefCache).Add
1.70s 4.90% 13.41% 3.18s 9.17% runtime.mallocgc
0 0% 13.41% 3.01s 8.68% main.(*cassandraStore).insertChunk
0 0% 13.41% 2.83s 8.16% net/http.(*ServeMux).Handle
0 0% 13.41% 2.80s 8.07% net/http.(*conn).serve
0 0% 13.41% 2.76s 7.96% net/http.(*ServeMux).match
0.02s 0.058% 13.47% 2.75s 7.93% main.Get
0 0% 13.47% 2.75s 7.93% main.get.func1
0.02s 0.058% 13.52% 2.56s 7.38% type..eq.github.com/gocql/gocql.resultSchemaChangeFrame
0.04s 0.12% 13.64% 2.53s 7.30% runtime.clearCheckmarks
0.03s 0.087% 13.73% 2.53s 7.30% runtime.gcmarknewobject_m
0 0% 13.73% 2.50s 7.21% github.com/gocql/gocql.(*resultSchemaChangeFrame).Header
0 0% 13.73% 2.49s 7.18% runtime.finishsweep_m
0.67s 1.93% 15.66% 2.47s 7.12% runtime.heapBitsSweepSpan
(pprof)
(pprof) list buteRDY
Total: 34.68s
ROUTINE ======================== github.com/nsqio/go-nsq.(*Consumer).redistributeRDY in /home/dieter/go/src/github.com/nsqio/go-nsq/consumer.go
0 12.52s (flat, cum) 36.10% of Total
. . 994: possibleConns := make([]*Conn, 0, len(conns))
. . 995: for _, c := range conns {
. . 996: lastMsgDuration := time.Now().Sub(c.LastMessageTime())
. . 997: rdyCount := c.RDY()
. . 998: r.log(LogLevelDebug, "(%s) rdy: %d (last message received %s)",
. 10ms 999: c.String(), rdyCount, lastMsgDuration)
. . 1000: if rdyCount > 0 && lastMsgDuration > r.config.LowRdyIdleTimeout {
. . 1001: r.log(LogLevelDebug, "(%s) idle connection, giving up RDY", c.String())
. . 1002: r.updateRDY(c, 0)
. . 1003: }
. . 1004: possibleConns = append(possibleConns, c)
. . 1005: }
. . 1006:
. . 1007: availableMaxInFlight := int64(maxInFlight) - atomic.LoadInt64(&r.totalRdyCount)
. . 1008: if r.inBackoff() {
. . 1009: availableMaxInFlight = 1 - atomic.LoadInt64(&r.totalRdyCount)
. . 1010: }
. . 1011:
. . 1012: for len(possibleConns) > 0 && availableMaxInFlight > 0 {
. . 1013: availableMaxInFlight--
. . 1014: r.rngMtx.Lock()
. . 1015: i := r.rng.Int() % len(possibleConns)
. . 1016: r.rngMtx.Unlock()
. . 1017: c := possibleConns[i]
. . 1018: // delete
. . 1019: possibleConns = append(possibleConns[:i], possibleConns[i+1:]...)
. 12.51s 1020: r.log(LogLevelDebug, "(%s) redistributing RDY", c.String())
. . 1021: r.updateRDY(c, 1)
. . 1022: }
. . 1023:}
. . 1024:
. . 1025:// Stop will initiate a graceful stop of the Consumer (permanent)
(pprof)
memory profile:
(pprof) top30 -cum
2856.55MB of 3014.07MB total (94.77%)
Dropped 347 nodes (cum <= 15.07MB)
Showing top 30 nodes out of 50 (cum >= 61.35MB)
flat flat% sum% cum cum%
0 0% 0% 3013.07MB 100% runtime.aeshashbody
0 0% 0% 2602.61MB 86.35% github.com/nsqio/go-nsq.(*Consumer).redistributeRDY
...
(pprof) list redistributeRDY
Total: 2.94GB
ROUTINE ======================== github.com/nsqio/go-nsq.(*Consumer).redistributeRDY in /home/dieter/go/src/github.com/nsqio/go-nsq/consumer.go
0 2.54GB (flat, cum) 86.35% of Total
. . 994: possibleConns := make([]*Conn, 0, len(conns))
. . 995: for _, c := range conns {
. . 996: lastMsgDuration := time.Now().Sub(c.LastMessageTime())
. . 997: rdyCount := c.RDY()
. . 998: r.log(LogLevelDebug, "(%s) rdy: %d (last message received %s)",
. . 999: c.String(), rdyCount, lastMsgDuration)
. . 1000: if rdyCount > 0 && lastMsgDuration > r.config.LowRdyIdleTimeout {
. . 1001: r.log(LogLevelDebug, "(%s) idle connection, giving up RDY", c.String())
. . 1002: r.updateRDY(c, 0)
. . 1003: }
. . 1004: possibleConns = append(possibleConns, c)
. . 1005: }
. . 1006:
. . 1007: availableMaxInFlight := int64(maxInFlight) - atomic.LoadInt64(&r.totalRdyCount)
. . 1008: if r.inBackoff() {
. . 1009: availableMaxInFlight = 1 - atomic.LoadInt64(&r.totalRdyCount)
. . 1010: }
. . 1011:
. . 1012: for len(possibleConns) > 0 && availableMaxInFlight > 0 {
. . 1013: availableMaxInFlight--
. . 1014: r.rngMtx.Lock()
. . 1015: i := r.rng.Int() % len(possibleConns)
. . 1016: r.rngMtx.Unlock()
. . 1017: c := possibleConns[i]
. . 1018: // delete
. . 1019: possibleConns = append(possibleConns[:i], possibleConns[i+1:]...)
. 2.54GB 1020: r.log(LogLevelDebug, "(%s) redistributing RDY", c.String())
. . 1021: r.updateRDY(c, 1)
. . 1022: }
. . 1023:}
. . 1024:
. . 1025:// Stop will initiate a graceful stop of the Consumer (permanent)
(pprof)
A consumer can become overwhelmed with messages such that when it's completing them they have already server-side timed out. In this case the client thinks it's ok and keeps FIN'ing messages and getting more and may never be able to work off it's backlog (this situation is worse the higher max-in-flight used).
go-nsq could use the presense of E_FIN_FAILED
to trigger backoff just like failing processing in the handler would. This will trigger RDY changes such that message flow drops off so the consumer can recover.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.