adjust / rmq Goto Github PK
View Code? Open in Web Editor NEWMessage queue system written in Go and backed by Redis
License: MIT License
Message queue system written in Go and backed by Redis
License: MIT License
I have a consumer function called Consume:
type TaskConsumer struct {
}
func (consumer *TaskConsumer) Consume(delivery rmq.Delivery) {
And I have several instances of my applications running.
On each instance, I add a consumer and start consuming tasks:
taskQueue := rmqConnection.OpenQueue("scheduleTasks")
taskQueue.StartConsuming(10, time.Second)
taskConsumer := &TaskConsumer{}
taskQueue.AddConsumer(uuid.NewV4().String(), taskConsumer)
And only one instance is publishing tasks:
taskQueue.Publish(UserId)
Am I doing something wrong ?
Hi,
I stumbled upon you lib recently its been very helpful and i am very grateful that you have put such tremendous effort into making it but i find the documentation lacking for example i don't know how to assign a key to payload and receive its appropriate response from worker using the assigned id...
This is only one of the many problems i have stumbled upon.
I would be very grateful if you could add more documentation to readme file discussing various other functions
Regards,
Sarim
Can I use this queue for a node.js (as consumer)? If so, which module for node to use?
I'm trying to send messages from a Rails app to a Go app, so the producing side needs to know the queue name. But there's this line that uses a random string.
name := fmt.Sprintf("%s-%s", tag, uniuri.NewLen(6))
Is this just not a supported use case?
Hi,
I am running Go 1.13.1 here (go version go1.13.1 darwin/amd64
). When getting/downloading rmq
, I get an error that wrapper.rawClient.FlushDb
is not defined. I guess it's because something in the redis lib changed.
$ go get github.com/adjust/rmq
# github.com/adjust/rmq
/Users/robin/go/src/github.com/adjust/rmq/redis_wrapper.go:85:19: wrapper.rawClient.FlushDb undefined (type *redis.Client has no field or method FlushDb)
If there is anything else you need from me, just let me know.
Thanks,
Robin
github.com/adjust/rmq
depends on github.com/adjust/gocheck
which depends on the now deprecated launchpad.net/gocheck
(which also uses bzr
instead of git
causing additional issues).
Any chance you could change the dependency here to the new github.com/go-check/check
?
See adjust/gocheck#2 for some extra information.
I got this kind of error:
rmq redis error is not nil OOM command not allowed when used memory > 'maxmemory'.
github.com/adjust/rmq.checkErr(0xfd75c0, 0xc420175670, 0x2a)
seems finished task not been released?
I'm trying to insert into the queue from a client not written in Go and having problems figuring out the keys to use.
LPUSH rmq::queue::[demo]::ready
Doesn't seem to be enough.
I run batch_consumer.go,Regular report:panic: rmq redis error is not nil EOF
https://redis.io/topics/streams-intro
Just a question, is support for redis streams planned?
Hello,
How can I make a broadcast to all consumers?
while I need gracefully shutdown worker, I shoud pause queue receive message first, but I found, the queue.Close
is purge the queue messages, so I want rmq could provide an function for pause message receive, Thanks.
What is the correct way to safely close the connection to redis while letting all the consumers (or producers) gracefully shutdown?
connection := rmq.OpenConnection("email queue", "tcp", config.Redis.Address, 1)
emailQueue := connection.OpenQueue("emails")
emailQueue.StartConsuming(100, time.Microsecond)
consumer := &queue.EmailConsumer{}
emailQueue.AddConsumer("email consumer", consumer)
done := make(chan bool)
// Abort when we press CTRL+C (go run...) or send a kill -9 (go build...)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
for _ = range c {
emailQueue.StopConsuming()
connection.Close()
done <- true
log.Println("Interrupt / SIGTERM received. Stopping...")
}
}()
<-done
I am trying to figure out what I'm doing wrong in causing a data race with redisQueue.StopConsuming()
WARNING: DATA RACE
Write at 0x00c4201960a8 by goroutine 104:
github.com/adjust/rmq.(*redisQueue).StopConsuming()
/Users/owner/go/src/github.com/adjust/rmq/queue.go:222 +0x9b
main.main.func2()
Previous read at 0x00c4201960a8 by goroutine 19:
github.com/adjust/rmq.(*redisQueue).consume()
/Users/owner/go/src/github.com/adjust/rmq/queue.go:290 +0x68
Goroutine 104 (running) created at:
main.main()
Goroutine 19 (running) created at:
github.com/adjust/rmq.(*redisQueue).StartConsuming()
/Users/owner/go/src/github.com/adjust/rmq/queue.go:213 +0x18b
main.main()
Code:
connection := rmq.OpenConnection("email queue", "tcp", Address, 1)
emailQueue := connection.OpenQueue("emails")
emailQueue.StartConsuming(100, time.Microsecond)
consumer := &queue.EmailConsumer{}
emailQueue.AddConsumer("consumer", consumer)
emailQueue.StopConsuming()
Hi,
I'm probably missing something again, please advise. Why not expose a wrapper for Delivery.Push() when it comes to batch consuming through rmq.Deliveries ? It only got Ack and Reject.
Thanks!
p.s. Just to mention, using PANIC makes the code a bit tricky to use. I am using the lib as an add-on, it should not interrupt the runtime. For now I wrapped some calls (e.g. openConnection) in defer+recover stuff, still does not look fine.
Was it done on purpose ? :)
What is the suggested approach to scale for throughput?
I am using rmq for a project which needs high throughput. If I need to increase throughput from producer -> Q -> consumer, what is the recommended approach? I am hoping for linear scalability - throw 2X more hardware, make it go 2X as fast (assuming I have a sufficient network bandwidth).
To give you an idea of the kind of throughput I hope to achieve: about 100K msgs/sec for a single Q. Overall, I'm looking at about 10M msgs/sec in the system, so I already know I'll have 100 Q's. Do I need to shard further?
Add more boxes to the redis cluster. Will that make rmq go faster? This is the preferred approach since I can have a single, large Q. My application semantics prefer a single, large, fast Q.
Create N redis cluster and rmq queues. Shard the Q traffic by the modulus of a hash of txn id. I think I could get linear scalability here by simply having more redis clusters.
Some other approach I haven't thought of.
Are there some performance benchmarks you can point me to?
I have a question on the implementation of prefetchLimit. Suppose there are enough ready deliveries then -
It seems that prefetchLimit is implemented as 2. So even in case of prefetchLimit 1, connection has 2 unAck deliveries, one in the process of consumption and one in the delivery chain. Is there any workaround this, I want to use rmq for long running CPU tasks(varied length) and only want to pull from ready queue when one delivery consumption has finished. Support for batch prefetchLimit may be ?
Note: rmq panics on Redis connection errors. Your producers and consumers will crash if Redis goes down. Please let us know if you would see this handled differently.
My producers and consumers are part of my main API - I don't want it to be taken down because there is a redis error.
I can understand a fail fast design for separate worker services but that is a decision that is better made by the application rather than one of its libraries.
The most idiomatic go would probably be to put errors everywhere, but that would be quite a big change to your API. One option would be to coalesce errors together and put the onus on the library user to check. This would work for me.
To make this a non-breaking change we could do the following:
panicOnErrors
with the default true, I like the following pattern for options (also non-breaking)://
type option func(*redisConnection)
function PanicOnError(value bool) option {
...
}
func OpenConnection(tag, network, address string, db int, options... option) *redisConnection
cleaner.Clean()
and it returns an error and return any aggregate (or first) error there so that the library user can take whatever recover action is appropriateAlternatively maybe it is cleaner (no pun intended) to have a separate Error() error
method.
Another possibility would be to have a PanicHandler(func (error) error)
so that where you would panic you instead run the provided handler function. The semantics could be that if that itself errs then you do panic. Hmm I think maybe I like that one the best - it means you can act immediately but keeps the API intact.
Hi,
Was wondering if there would be any support for BulkPublishBytes
and the likes for other Publish method which would use the underlying Redis client's actual LPush signature as is (which is LPush(key string, values ...interface{})
) ? If it's string
in place of interface{}
, that'd be awesome too !
If interested, I'm more than willing to help create a PR for this within the week.
Does this work when I have multiple different machines reading from the same queue? It looks like it might not based on examination of the code
I don't know whether this should be reported here as I have yet dive into the mechanics of this package, but I wanted to report that due to the changed interface of the SET command in Redis since 2.6.12 (see here), OpenConnection
will panic immediately with the error:
rmq redis error is not nil ERR wrong number of arguments for 'set' command
If used with any redis version below 2.6.12. A note about that in the README (or a fix if possible, prettypleasemaybe?) would be really helpful.
Cheers
PS: A first glance at the package tells me that this panic is due to the mechanics of hearbeating. I haven't fully understood the package yet, though, so take my analysis with a saltshaker worth of salt.
Hello,
I'm using docker for development. If I restart docker, the previous connections, consumers and handlers are no longer in need. But in the overview page, it shows as closed. I don't think this user-friendly.
In my configuration, I only set one consumer. Each time I docker-compose down and docker-compose up -d, then there will be more one a dead connection, consumer and handler, which I don't need.
Is there any better way to handle this? Thanks.
Line 332 in 507bf31
consume
now call LLEN && RPOPLPUSH to gain queue message from redis.
BRPOPLPUSH can get rid of that LLEN call, should be more efficient.
Hello,
Since this is a queue based on redis, should I see anything in redis when I check with
KEYS *
but I didn't see anything, this confused me .
cannot use "github.com/go-redis/redis".NewClient(&"github.com/go-redis/redis".Options literal) (type *"github.com/go-redis/redis".Client) as type *"gopkg.in/redis.v3".Client in argument to rmq.OpenConnectionWithRedisClient
Getting below error while getting rmq package into docker image. Let me know if more information is required.
$docker build --no-cache -f Dockerfile-rmq -t test test
....
Step 4/4 : RUN go get -u github.com/adjust/rmq
---> Running in f3280e2aa42d
package github.com/go-redis/redis/v7: cannot find package "github.com/go-redis/redis/v7" in any of:
/usr/local/go/src/github.com/go-redis/redis/v7 (from $GOROOT)
/go/src/github.com/go-redis/redis/v7 (from $GOPATH)
$ docker --version
Docker version 19.03.8, build afacb8b7f0
Dockerfile-rmq
FROM golang:1.14.2-alpine3.11
ENV GOBIN=/go/bin
RUN apk update && apk upgrade && \
apk add --no-cache git openssh
RUN go get -u github.com/adjust/rmq
So I planned to use this library for some notification stuff, but each time I restart the script my client gets a new name, so the name consists of tag + 4 random characters
which prevents using the same queue again after a restart.
So what's the reason behind getting each time on a new connection a new unique name?
And could we make it a optional parameter?
Hi, I want to use this library for push notifications so they don't vanish from memory once the service crashes. Do you think this is a sensible use-case (also considering production use)?
When setting up a small example project I figured that the prefetch limit and poll duration somehow don't align. Setting a limit of 2 and a timeout of 10 seconds still sucks up random chunks of 7-10 deliveries at once, depending how fast I add items to the queue.
func main() {
// Setup redis and notification queue
redis := rmq.OpenConnection("my service", "tcp", "localhost:6379", 1)
queue := redis.OpenQueue("notifications")
c := Consumer{}
// Start consuming
queue.StartConsuming(2, time.Second*10)
queue.AddConsumer("consumer", &c)
e := echo.New()
e.Run(standard.New(":" + port))
}
func (c *Consumer) Consume(delivery rmq.Delivery) {
log.Println(delivery.Payload())
delivery.Ack()
}
You can see that they got all the same second in the timestamp:
2016/10/22 00:07:01 bla
2016/10/22 00:07:01 bla
2016/10/22 00:07:01 bla
2016/10/22 00:07:01 bla
2016/10/22 00:07:01 bla
2016/10/22 00:07:01 bla
2016/10/22 00:07:01 bla
2016/10/22 00:07:01 bla
2016/10/22 00:07:01 bla
Hi, i can't connect to my Heroku Redis
url := redis://{user}:{password}@{host}:{port}
connection := rmq.OpenConnection("", "tcp", url, 1)
got this error
2020/03/15 14:33:40 rmq redis error is not nil dial tcp: address {the url}: too many colons in address
panic: rmq redis error is not nil dial tcp: address {the url}: too many colons in address
is there any bug and how to fix this?
Thank you
Would it be possible for your functions to not panic when Redis is unavailable? Especially in goroutines that are started by functions like queue.StartConsuming. Since there's no way to recover from a panicked "child" goroutine, we are unable to handle cases when our Redis instance becomes temporarily unavailable and our consumers begin dying.
May be queue.consume() could recover from redisErrIsNil's panic and return an error to StartConsuming (through a channel)? Or better yet, redisErrIsNil doesn't panic at all and simply returns errors that your users can handle :)
Hi,
Thanks a lot for this package. Could you expose the queue.RemoveConsumer function through the Queue interface?
Thanks!
Please add Go modules support
Can we have persistence and durability of the message queue to be used as logging?
Any plans on exposing queue metrics via Prometheus endpoint? If not, I'd like to contribute.
I've been using rmq for a little under a week now, and unfortunately, I've started to run into some issues.
I'm currently sharing one connection across 4-5 queues, in the past, I've had little to no issues doing this, but as shown by the screenshot above, the single connection seems to spawning a lot more than expected, over time the connections build and cause redis to timeout. Below is a snippet of how I'm opening and sharing the connection, alongside how the cleaner is running, if you have any suggestions on how to improve this or what may cause the mass amount of clients please let me know, thanks!
var connection rmq.Connection
func OpenConnection(url string) {
opt, _ := redis.ParseURL(url)
opt.MaxRetries = 5
redisConn := rmq.OpenConnectionWithRedisClient("heartbeat", redis.NewClient(opt))
connection = redisConn
go func() {
cleaner := rmq.NewCleaner(redisConn)
for range time.Tick(time.Second) {
err := cleaner.Clean()
if err != nil {
sentry.CaptureException(err)
}
}
}()
}
Just some questions on the usage:
StartConsuming
if i give prefetchLimit
a value greater than 1 and my consumer program crash does it mean those prefetched items will be lost?AddConsumer
, will each consumer be run in a separate goroutine?The official Redis command states the response is an int of the list length.
The RedisClient
interface here returns a bool instead
https://github.com/adjust/rmq/blob/master/redis_wrapper.go#L37
Hi,
Why wouldn't you expose ReturnAllUnacked via Queue ? Just thinking of how do I handle a consumer's death, there is alway an option of the unpacked elements to be left which means data loss.
Thanks.
Hi to everybody here, and thank you for this project.
I'm trying to connect to my redis with password using rmq.OpenConnectionWithRedisClient()
I've working with gomodules and I've imported last rmq commit with.
go get github.com/adjust/rmq@f9bb31b720096e775f1e83f2c194bbe3409e8688
Also importing redis client as described in https://github.com/go-redis/redis
go get github.com/go-redis/redis/v7
and
import (
"github.com/adjust/rmq"
"github.com/go-redis/redis/v7"
)
But this error happens.
pkg/agent/rqm/rqm.go:63:43: cannot use redisClient (type *"github.com/go-redis/redis/v7".Client) as type *"github.com/go-redis/redis".Client in argument to rmq.OpenConnectionWithRedisClient
Could somebody told me what I did wrong
I note from: #40 that redisConnection
is deliberately not exported, but the Connection
interface does not have the Close()
method. There is therefore no way outside of local declaration scope to close a connection. Should I care?
Is there any particular reason these two methods are missing from the interface? If not, can I do a PR to add them in?
The Queue Interface
https://github.com/adjust/rmq/blob/master/queue.go#L32:L46
Hi,
Is there any reason for using github.com/adjust/redis instead of github.com/go-redis/redis? The native client is under constant development and many improvements have been added since
Hi,
We get panics when redis timeouts. The redis is alive but under pressure. How can we raise the limit or gracefully retry ?
Attached stack
2018/07/13 15:06:06 rmq redis error is not nil write tcp 172.17.0.4:51830->someip:6379: i/o timeout
panic: rmq redis error is not nil write tcp 172.17.0.4:51830->someip:6379: i/o timeout
goroutine 40 [running]:
log.Panicf(0xa2fd38, 0x1d, 0xc4861f5e00, 0x1, 0x1)
/usr/local/go/src/log/log.go:333 +0xda
/my/go/path/vendor/github.com/adjust/rmq.checkErr(0xa8c540, 0xc479161f40, 0x5a)
/my/go/path/vendor/github.com/adjust/rmq/redis_wrapper.go:96 +0xcc
/my/go/path/vendor/github.com/adjust/rmq.RedisWrapper.LRem(0xc4200b27d0, 0xc4202a81e0, 0x5a, 0x1, 0xc4f95f0000, 0xafa3827, 0x28, 0x79bb15)
/my/go/path/vendor/github.com/adjust/rmq/redis_wrapper.go:51 +0xc4
/my/go/path/vendor/github.com/adjust/rmq.(*wrapDelivery).Ack(0xc47b168e10, 0xa46ae8)
/my/go/path/vendor/github.com/adjust/rmq/delivery.go:43 +0x60
/my/go/path/queue.(*dequeuer).Consume(0xc420190030, 0xa92600, 0xc47b168e10)
/my/go/pathqueue/consumer.go:103 +0xf9
/my/go/path/vendor/github.com/adjust/rmq.(*redisQueue).consumerConsume(0xc4202e2000, 0xa8c220, 0xc420190030)
/my/go/path/vendor/github.com/adjust/rmq/queue.go:326 +0x6c
created by /my/go/path/vendor/github.com/adjust/rmq.(*redisQueue).AddConsumer
/my/go/path/vendor/github.com/adjust/rmq/queue.go:229 +0x8d
Hello,
The go redis client recommends using package: "github.com/go-redis/redis"
, but this project uses: "gopkg.in/redis.v3"
.
Trying to use those two packages together I get the following error:
cannot use (type *"github.com/foo/bar/vendor/github.com/go-redis/redis".Client) as type *"github.com/foo/bar/vendor/gopkg.in/redis.v3".Client in argument to rmq.OpenConnectionWithRedisClient
I think this project dependencies should be updated to redis v6 and use github.com/go-redis/redis
.
i have a task queue storing tasks, but the task may fail sometimes, i want to modify some fields before using delivery.Reject().For example incr the field retry
,and when retry
greater than some number, we will give up the task. Is there possible to use your api to do that?
Hi,
I don't know if this is by design - but after stopping to consume a Queue via StopConsuming
, it is impossible to resume consuming via StartConsuming
. The problem is that the latter function checks for deliveryChan being not nil. However, this will always be the case after consumption has started.
It would probably more correct to check for the consumingStopped value. However, I have no idea to gracefully handle the channel renewal (since the prefetchLimit might have changed).
Hello , and thank you everybody who is working or contributing with this great project
I'm trying to run rmq to build a distributed job load system, I have a master building jobs and right now only one agent running jobs.
The master has processed from buildID 1259 to 1270, but the agent has consumed and acked only the 1259,1260,1261,1262,1263,1266, all the other still in the queue as you can see in my "Redis DeskTop Manager" screenshot
I have configured the agent to fetch data for each second
redisClient = redis.NewClient(&redis.Options{
Network: "tcp",
Addr: conf.RedisEndpoint,
DB: 0,
Password: conf.RedisPassword,
})
//check error?
_, err := redisClient.Ping().Result()
if err != nil {
log.Errorf("Can no connect to redis %s", err)
return err
}
con := rmq.OpenConnectionWithRedisClient("sya", redisClient)
jobReqQ = con.OpenQueue("job_request")
jobRespQ = con.OpenQueue("job_responses")
jobReqQ.StartConsuming(10, time.Second)
jobReqQ.AddConsumer("consumer-"+conf.NodeID, NewAgentConsumer())
I've restarted the agent but nothing happens, data still in queue. Data is not fetched anymore ( the producer "master" agent is stopped right now)
Thank you in advance
Could you please let us know what are the best practices we need to follow when using this package like number of connections, should we create a new connection every time or reuse, when to stop the heartbeat, when to call cleaner etc.
Also do you see any issues if common queues are being listened to from set of kubernetes pods distributed across different nodes ?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.