Git Product home page Git Product logo

rmq's Introduction

Build Status GoDoc

Overview

rmq is short for Redis message queue. It's a message queue system written in Go and backed by Redis.

Basic Usage

Let's take a look at how to use rmq.

Import

Of course you need to import rmq wherever you want to use it.

import "github.com/adjust/rmq/v5"

Connection

Before we get to queues, we first need to establish a connection. Each rmq connection has a name (used in statistics) and Redis connection details including which Redis database to use. The most basic Redis connection uses a TCP connection to a given host and a port:

connection, err := rmq.OpenConnection("my service", "tcp", "localhost:6379", 1, errChan)

It's also possible to access a Redis listening on a Unix socket:

connection, err := rmq.OpenConnection("my service", "unix", "/tmp/redis.sock", 1, errChan)

For more flexible setup you can pass Redis options or create your own Redis client:

connection, err := OpenConnectionWithRedisOptions("my service", redisOptions, errChan)
connection, err := OpenConnectionWithRedisClient("my service", redisClient, errChan)

If the Redis instance can't be reached you will receive an error indicating this.

Please also note the errChan parameter. There is some rmq logic running in the background which can run into Redis errors. If you pass an error channel to the OpenConnection() functions rmq will send those background errors to this channel so you can handle them asynchronously. For more details about this and handling suggestions see the section about handling background errors below.

Connecting to a Redis cluster

In order to connect to a Redis cluster please use OpenClusterConnection():

redisClusterOptions := &redis.ClusterOptions{ /* ... */ }
redisClusterClient := redis.NewClusterClient(redisClusterOptions)
connection, err := OpenClusterConnection("my service", redisClusterClient, errChan)

Note that such an rmq cluster connection uses different Redis than rmq connections opened by OpenConnection() or similar. If you have used a Redis instance with OpenConnection() then it is NOT SAFE to reuse that rmq system by connecting to it via OpenClusterConnection(). The cluster state won't be compatible and this will likely lead to data loss.

If you've previously used OpenConnection() or similar you should only consider using OpenClusterConnection() with a fresh Redis cluster.

Queues

Once we have a connection we can use it to finally access queues. Each queue must have a unique name by which we address it. Queues are created once they are accessed. There is no need to declare them in advance. Here we open a queue named "tasks":

taskQueue, err := connection.OpenQueue("tasks")

Again, possibly Redis errors might be returned.

Producers

An empty queue is boring, let's add some deliveries! Internally all deliveries are saved to Redis lists as strings. This is how you can publish a string payload to a queue:

delivery := "task payload"
err := taskQueue.Publish(delivery)

In practice, however, it's more common to have instances of some struct that we want to publish to a queue. Assuming task is of some type like Task, this is how to publish the JSON representation of that task:

// create task
taskBytes, err := json.Marshal(task)
if err != nil {
    // handle error
}

err = taskQueue.PublishBytes(taskBytes)

For a full example see example/producer.

Consumers

Now that our queue starts filling, let's add a consumer. After opening the queue as before, we need it to start consuming before we can add consumers.

err := taskQueue.StartConsuming(10, time.Second)

This sets the prefetch limit to 10 and the poll duration to one second. This means the queue will fetch up to 10 deliveries at a time before giving them to the consumers. To avoid idling consumers while the queues are full, the prefetch limit should always be greater than the number of consumers you are going to add. If the queue gets empty, the poll duration sets how long rmq will wait before checking for new deliveries in Redis.

Once this is set up, we can actually add consumers to the consuming queue.

taskConsumer := &TaskConsumer{}
name, err := taskQueue.AddConsumer("task-consumer", taskConsumer)

To uniquely identify each consumer internally rmq creates a random name with the given prefix. For example in this case name might be task-consumer-WB1zaq. This name is only used in statistics.

In our example above the injected taskConsumer (of type *TaskConsumer) must implement the rmq.Consumer interface. For example:

func (consumer *TaskConsumer) Consume(delivery rmq.Delivery) {
    var task Task
    if err = json.Unmarshal([]byte(delivery.Payload()), &task); err != nil {
        // handle json error
        if err := delivery.Reject(); err != nil {
            // handle reject error
        }
        return
    }

    // perform task
    log.Printf("performing task %s", task)
    if err := delivery.Ack(); err != nil {
        // handle ack error
    }
}

First we unmarshal the JSON package found in the delivery payload. If this fails we reject the delivery. Otherwise we perform the task and ack the delivery.

If you don't actually need a consumer struct you can use AddConsumerFunc instead and pass a consumer function which handles an rmq.Delivery:

name, err := taskQueue.AddConsumerFunc(func(delivery rmq.Delivery) {
    // handle delivery and call Ack() or Reject() on it
})

Please note that delivery.Ack() and similar functions have a built-in retry mechanism which will block your consumers in some cases. This is because failing to acknowledge a delivery is potentially dangerous. For details see the section about background errors below.

For a full example see example/consumer.

Consumer Lifecycle

As described above you can add consumers to a queue. For each consumer rmq takes one of the prefetched unacked deliveries from the delivery channel and passes it to the consumer's Consume() function. The next delivery will only be passed to the same consumer once the prior Consume() call returns. So each consumer will only be consuming a single delivery at any given time.

Furthermore each Consume() call is expected to call either delivery.Ack(), delivery.Reject() or delivery.Push() (see below). If that's not the case these deliveries will remain unacked and the prefetch goroutine won't make progress after a while. So make sure you always call exactly one of those functions in your Consume() implementations.

Background Errors

It's recommended to inject an error channel into the OpenConnection() functions. This section describes it's purpose and how you might use it to monitor rmq background Redis errors.

There are three sources of background errors which rmq detects (and handles internally):

  1. The OpenConnection() functions spawn a goroutine which keeps a heartbeat Redis key alive. This is important so that the cleaner (see below) can tell which connections are still alive and must not be cleaned yet. If the heartbeat goroutine fails to update the heartbeat Redis key repeatedly foo too long the cleaner might clean up the connection prematurely. To avoid this the connection will automatically stop all consumers after 45 consecutive heartbeat errors. This magic number is based on the details of the heartbeat key: The heartbeat tries to update the key every second with a TTL of one minute. So only after 60 failed attempts the heartbeat key would be dead.

    Every time this goroutine runs into a Redis error it gets send to the error channel as HeartbeatError.

  2. The StartConsuming() function spawns a goroutine which is responsible for prefetching deliveries from the Redis ready list and moving them into a delivery channel. This delivery channels feeds into your consumers Consume() functions. If the prefetch goroutine runs into Redis errors this basically means that there won't be new deliveries being sent to your consumers until it can fetch new ones. So these Redis errors are not dangerous, it just means that your consumers will start idling until the Redis connection recovers.

    Every time this goroutine runs into a Redis error it gets send to the error channel as ConsumeError.

  3. The delivery functions Ack(), Reject() and Push() have a built-in retry mechanism. This is because failing to acknowledge a delivery is potentially dangerous. The consumer has already handled the delivery, so if it can't ack it the cleaner might end up moving it back to the ready list so another consumer might end up consuming it again in the future, leading to double delivery.

    So if a delivery failed to be acked because of a Redis error the Ack() call will block and retry once a second until it either succeeds or until consuming gets stopped (see below). In the latter case the Ack() call will return rmq.ErrorConsumingStopped which you should handle in your consume function. For example you might want to log about the delivery so you can manually remove it from the unacked or ready list before you start new consumers. Or at least you can know which deliveries might end up being consumed twice.

    Every time these functions runs into a Redis error it gets send to the error channel as DeliveryError.

Each of those error types has a field Count which tells you how often the operation failed consecutively. This indicates for how long the affected Redis instance has been unavailable. One general way of using this information might be to have metrics about the error types including the error count so you can keep track of how stable your Redis instances and connections are. By monitoring this you might learn about instabilities before they affect your services in significant ways.

Below is some more specific advice on handling the different error cases outlined above. Keep in mind though that all of those errors are likely to happen at the same time, as Redis tends to be up or down completely. But if you're using multi Redis instance setup like nutcracker you might see some of them in isolation from the others.

  1. HeartbeatErrors: Once err.Count equals HeartbeatErrorLimit you should know that the consumers of this connection will stop consuming. And they won't restart consuming on their own. This is a condition you should closely monitor because this means you will have to restart your service in order to resume consuming. Before restarting you should check your Redis instance.

  2. ConsumeError: These are mostly informational. As long as those errors keep happening the consumers will effectively be paused. But once these operations start succeeding again the consumers will resume consumers on their own.

  3. DeliveryError: When you see deliveries failing to ack repeatedly this also means your consumers won't make progress as they will keep retrying to ack pending deliveries before starting to consume new ones. As long as this keeps happening you should avoid stopping the service if you can. That is because the already consumed by not yet unacked deliveries will be returned to ready be the cleaner afterwards, which leads to double delivery. So ideally you try to get Redis connection up again as long as the deliveries are still trying to ack. Once acking works again it's safe to restart again.

    More realistically, if you still need to stop the service when Redis is down, keep in mind that calling StopConsuming() will make the blocking Ack() calls return with ErrorConsumingStopped, so you can handle that case to make an attempt to either avoid the double delivery or at least track it for future investigation.

Advanced Usage

Batch Consumers

Sometimes it's useful to have consumers work on batches of deliveries instead of individual ones. For example for bulk database inserts. In those cases you can use AddBatchConsumer():

batchConsumer := &MyBatchConsumer{}
name, err := taskQueue.AddBatchConsumer("my-consumer", 100, time.Second, batchConsumer)

In this example we create a batch consumer which will receive batches of up to 100 deliveries. We set the batchTimeout to one second, so if there are less than 100 deliveries per second we will still consume at least one batch per second (which would contain less than 100 deliveries).

The rmq.BatchConsumer interface is very similar to rmq.Consumer.

func (consumer *MyBatchConsumer) Consume(batch rmq.Deliveries) {
    payloads := batch.Payloads()
    // handle payloads
    if errors := batch.Ack(); len(errors) > 0 {
        // handle ack errors
    }
}

Note that batch.Ack() acknowledges all deliveries in the batch. It's also possible to ack some of the deliveries and reject the rest. It uses the same retry mechanism per delivery as discussed above. If some of the deliveries continue to fail to ack when consuming gets stopped (see below), then batch.Ack() will return an error map map[int]error. For each entry in this map the key will be the index of the delivery which failed to ack and the value will be the error it ran into. That way you can map the errors back to the deliveries to know which deliveries are at risk of being consumed again in the future as discussed above.

For a full example see example/batch_consumer.

Push Queues

Another thing which can be useful is a mechanism for retries. Let's say you have tasks which can fail for external reasons but you'd like to retry them a few times after a while before you give up. In that case you can set up a chain of push queues like this:

incomingQ -> pushQ1 -> pushQ2

In the queue setup code it would look like this (error handling omitted for brevity):

incomingQ, err := connection.OpenQueue("incomingQ")
pushQ1, err := connection.OpenQueue("pushQ1")
pushQ2, err := connection.OpenQueue("pushQ2")
incomingQ.SetPushQueue(pushQ1)
pushQ1.SetPushQueue(pushQ2)
_, err := incomingQ.AddConsumer("incomingQ", NewConsumer())
_, err := pushQ1.AddConsumer("pushQ1", NewConsumer())
_, err := pushQ2.AddConsumer("pushQ2", NewConsumer())

If you have set up your queues like this, you can now call delivery.Push() in your Consume() function to push the delivery from the consuming queue to the associated push queue. So if consumption fails on incomingQ, then the delivery would be moved to pushQ1 and so on. If you have the consumers wait until the deliveries have a certain age you can use this pattern to retry after certain durations.

Note that delivery.Push() has the same affect as delivery.Reject() if the queue has no push queue set up. So in our example above, if the delivery fails in the consumer on pushQ2, then the Push() call will reject the delivery.

Stop Consuming

If you want to stop consuming from the queue, you can call StopConsuming():

finishedChan := taskQueue.StopConsuming()

When StopConsuming() is called, it will immediately stop fetching more deliveries from Redis and won't send any more of the already prefetched deliveries to consumers.

In the background it will make pending Ack() calls return rmq.ErrorConsumingStopped if they still run into Redis errors (see above) and wait for all consumers to finish consuming their current delivery before closing the returned finishedChan. So while StopConsuming() returns immediately, you can wait on the returned channel until all consumers are done:

<-finishedChan

You can also stop consuming on all queues in your connection:

finishedChan := connection.StopAllConsuming()

Wait on the finishedChan to wait for all consumers on all queues to finish.

This is useful to implement a graceful shutdown of a consumer service. Please note that after calling StopConsuming() the queue might not be in a state where you can add consumers and call StartConsuming() again. If you have a use case where you actually need that sort of flexibility, please let us know. Currently for each queue you are only supposed to call StartConsuming() and StopConsuming() at most once.

Also note that StopAllConsuming() will stop the heartbeat for this connection. It's advised to also not publish to any queue opened by this connection anymore.

Return Rejected Deliveries

Even if you don't have a push queue setup there are cases where you need to consume previously failed deliveries again. For example an external dependency might have an issue or you might have deployed a broken consumer service which rejects all deliveries for some reason.

In those cases you would wait for the external party to recover or fix your mistake to get ready to reprocess the deliveries again. Now you can return the deliveries by opening affected queue and call ReturnRejected():

returned, err := queue.ReturnRejected(10000)

In this case we ask rmq to return up to 10k deliveries from the rejected list to the ready list. To return all of them you can pass math.MaxInt64.

If there was no error it returns the number of deliveries that were moved.

If you find yourself doing this regularly on some queues consider setting up a push queue to automatically retry failed deliveries regularly.

See example/returner

Purge Rejected Deliveries

You might run into the case where you have rejected deliveries which you don't intend to retry again for one reason or another. In those cases you can clear the full rejected list by calling PurgeRejected():

count, err := queue.PurgeRejected()

It returns the number of purged deliveries.

Similarly, there's a function to clear the ready list of deliveries:

count, err := queue.PurgeReady()

See example/purger.

Cleaner

You should regularly run a queue cleaner to make sure no unacked deliveries are stuck in the queue system. The background is that a consumer service prefetches deliveries by moving them from the ready list to an unacked list associated with the queue connection. If the consumer dies by crashing or even by being gracefully shut down by calling StopConsuming(), the unacked deliveries will remain in that Redis list.

If you run a queue cleaner regularly it will detect queue connections whose heartbeat expired and will clean up all their consumer queues by moving their unacked deliveries back to the ready list.

Although it should be safe to run multiple cleaners, it's recommended to run exactly one instance per queue system and have it trigger the cleaning process regularly, like once a minute.

See example/cleaner.

Header

Redis protocol does not define a specific way to pass additional data like header. However, there is often need to pass them (for example for traces propagation).

This implementation injects optional header values marked with a signature into payload body during publishing. When message is consumed, if signature is present, header and original payload are extracted from augmented payload.

Header is defined as http.Header for better interoperability with existing libraries, for example with propagation.HeaderCarrier.

 // ....
 
 h := make(http.Header)
 h.Set("X-Baz", "quux")

 // You can add header to your payload during publish.
 _ = pub.Publish(rmq.PayloadWithHeader(`{"foo":"bar"}`, h))

 // ....

 _, _ = con.AddConsumerFunc("tag", func(delivery rmq.Delivery) {
     // And receive header back in consumer.
     delivery.(rmq.WithHeader).Header().Get("X-Baz") // "quux"
     
     // ....
 })

Adding a header is an explicit opt-in operation and so it does not affect library's backwards compatibility by default (when not used).

Please note that adding header may lead to compatibility issues if:

  • consumer is built with older version of rmq when publisher has already started using header, this can be avoided by upgrading consumers before publishers;
  • consumer is not using rmq (other libs, low level tools like redis-cli) and is not aware of payload format extension.

Testing Included

To simplify testing of queue producers and consumers we include test mocks.

Test Connection

As before, we first need a queue connection, but this time we use a rmq.TestConnection that doesn't need any connection settings.

testConn := rmq.NewTestConnection()

If you are using a testing framework that uses test suites, you can reuse that test connection by setting it up once for the suite and resetting it with testConn.Reset() before each test.

Producer Tests

Now let's say we want to test the function publishTask() that creates a task and publishes it to a queue from that connection.

// call the function that should publish a task
publishTask(testConn)

// check that the task is published
assert.Equal(t, "task payload", suite.testConn.GetDelivery("tasks", 0))

The assert.Equal part is from testify, but it will look similar for other testing frameworks. Given a rmq.TestConnection, we can check the deliveries that were published to its queues (since the last Reset() call) with GetDelivery(queueName, index). In this case we want to extract the first (and possibly only) delivery that was published to queue tasks and just check the payload string.

If the payload is JSON again, the unmarshalling and check might look like this:

var task Task
err := json.Unmarshal([]byte(suite.testConn.GetDelivery("tasks", 0)), &task)
assert.NoError(t, err)
assert.NotNil(t, task)
assert.Equal(t, "value", task.Property)

If you expect a producer to create multiple deliveries you can use different indexes to access them all.

assert.Equal(t, "task1", suite.testConn.GetDelivery("tasks", 0))
assert.Equal(t, "task2", suite.testConn.GetDelivery("tasks", 1))

For convenience there's also a function GetDeliveries that returns all published deliveries to a queue as string array.

assert.Equal(t, []string{"task1", "task2"}, suite.testConn.GetDeliveries("tasks"))

These examples assume that you inject the rmq.Connection into your testable functions. If you inject instances of rmq.Queue instead, you can use rmq.TestQueue instances in tests and access their LastDeliveries (since Reset()) directly.

Consumer Tests

Testing consumers is a bit easier because consumers must implement the rmq.Consumer interface. In the tests just create an rmq.TestDelivery and pass it to your Consume() function. This example creates a test delivery from a string and then checks that the delivery was acked.

consumer := &TaskConsumer{}
delivery := rmq.NewTestDeliveryString("task payload")

consumer.Consume(delivery)

assert.Equal(t, rmq.Acked, delivery.State)

The State field will always be one of these values:

  • rmq.Acked: The delivery was acked
  • rmq.Rejected: The delivery was rejected
  • rmq.Pushed: The delivery was pushed (see below)
  • rmq.Unacked: Nothing of the above

If your packages are JSON marshalled objects, then you can create test deliveries out of those like this:

task := Task{Property: "bad value"}
delivery := rmq.NewTestDelivery(task)

Integration Tests

If you want to write integration tests which exercise both producers and consumers at the same time, you can use the rmq.OpenConnectionWithTestRedisClient constructor. It returns a real rmq.Connection instance which is backed by an in-memory Redis client implementation. That way it behaves exactly as in production, just without the durability of a real Redis client. Don't use this in production!

Statistics

Given a connection, you can call connection.CollectStats() to receive rmq.Stats about all open queues, connections and consumers. If you run example/handler you can see what's available:

In this example you see 5 connections consuming task_kind1, each wich 5 consumers each. They have a total of 1007 packages unacked. Below the marker you see connections which are not consuming. One of the handler connections died because I stopped the handler. Running the cleaner would clean that up (see below).

Prometheus

If you are using Prometheus, rmqprom collects statistics about all open queues and exposes them as Prometheus metrics.

rmq's People

Contributors

alphanecron avatar amlivedn avatar andrewslotin avatar beornf avatar bubunyo avatar countcain avatar dim avatar fmstephe avatar gochief avatar heeko avatar iviero avatar khalilsarwari avatar mariaefi29 avatar mathieunls avatar narqo avatar nehaboob avatar neohuang avatar pffreitas avatar psampaz avatar roa avatar sakshamzomato avatar shellbye avatar si3nloong avatar silasdavis avatar testwill avatar vearutop avatar wellle avatar xpzouying avatar yiming1997 avatar ztrue avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rmq's Issues

launchpad.net/gocheck is deprecated

github.com/adjust/rmq depends on github.com/adjust/gocheck which depends on the now deprecated launchpad.net/gocheck (which also uses bzr instead of git causing additional issues).

Any chance you could change the dependency here to the new github.com/go-check/check?

See adjust/gocheck#2 for some extra information.

error while getting rmq package into docker image.

Getting below error while getting rmq package into docker image. Let me know if more information is required.

$docker build --no-cache  -f Dockerfile-rmq -t test test
....
Step 4/4 : RUN go get -u github.com/adjust/rmq
 ---> Running in f3280e2aa42d
package github.com/go-redis/redis/v7: cannot find package "github.com/go-redis/redis/v7" in any of:
	/usr/local/go/src/github.com/go-redis/redis/v7 (from $GOROOT)
	/go/src/github.com/go-redis/redis/v7 (from $GOPATH)

$ docker --version
Docker version 19.03.8, build afacb8b7f0

Dockerfile-rmq
FROM golang:1.14.2-alpine3.11

ENV GOBIN=/go/bin

RUN apk update && apk upgrade && \
    apk add --no-cache git openssh

RUN go get -u github.com/adjust/rmq

Inconsistency between redis clients

Hello,

The go redis client recommends using package: "github.com/go-redis/redis", but this project uses: "gopkg.in/redis.v3".

Trying to use those two packages together I get the following error:

cannot use (type *"github.com/foo/bar/vendor/github.com/go-redis/redis".Client) as type *"github.com/foo/bar/vendor/gopkg.in/redis.v3".Client in argument to rmq.OpenConnectionWithRedisClient

I think this project dependencies should be updated to redis v6 and use github.com/go-redis/redis.

Unable to resume consuming once stopped

Hi,

I don't know if this is by design - but after stopping to consume a Queue via StopConsuming, it is impossible to resume consuming via StartConsuming. The problem is that the latter function checks for deliveryChan being not nil. However, this will always be the case after consumption has started.

It would probably more correct to check for the consumingStopped value. However, I have no idea to gracefully handle the channel renewal (since the prefetchLimit might have changed).

How you you safely close the connection?

What is the correct way to safely close the connection to redis while letting all the consumers (or producers) gracefully shutdown?

	connection := rmq.OpenConnection("email queue", "tcp", config.Redis.Address, 1)
	emailQueue := connection.OpenQueue("emails")
	emailQueue.StartConsuming(100, time.Microsecond)

	consumer := &queue.EmailConsumer{}
	emailQueue.AddConsumer("email consumer", consumer)

	done := make(chan bool)

	// Abort when we press CTRL+C (go run...) or send a kill -9 (go build...)
	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
	go func() {
		for _ = range c {
			emailQueue.StopConsuming()
			connection.Close()
			done <- true
			log.Println("Interrupt / SIGTERM received. Stopping...")
		}
	}()

	<-done

Task not always taken by a consumer

I have a consumer function called Consume:

type TaskConsumer struct {
}

func (consumer *TaskConsumer) Consume(delivery rmq.Delivery) {

And I have several instances of my applications running.
On each instance, I add a consumer and start consuming tasks:

taskQueue := rmqConnection.OpenQueue("scheduleTasks")
taskQueue.StartConsuming(10, time.Second)
taskConsumer := &TaskConsumer{}
taskQueue.AddConsumer(uuid.NewV4().String(), taskConsumer)

And only one instance is publishing tasks:

taskQueue.Publish(UserId)

Am I doing something wrong ?

query on consumer

Just some questions on the usage:

  • For StartConsuming if i give prefetchLimit a value greater than 1 and my consumer program crash does it mean those prefetched items will be lost?
  • When I call AddConsumer, will each consumer be run in a separate goroutine?
  • Also what will happen to those rejected? Go back to the queue?

Best practices

Could you please let us know what are the best practices we need to follow when using this package like number of connections, should we create a new connection every time or reuse, when to stop the heartbeat, when to call cleaner etc.

Also do you see any issues if common queues are being listened to from set of kubernetes pods distributed across different nodes ?

ReturnAllUnacked

Hi,

Why wouldn't you expose ReturnAllUnacked via Queue ? Just thinking of how do I handle a consumer's death, there is alway an option of the unpacked elements to be left which means data loss.

Thanks.

Question on Queue prefetchLimit

I have a question on the implementation of prefetchLimit. Suppose there are enough ready deliveries then -

  1. queue/connection consumes in batches of prefetchLimit and waits for the consumer to consume the batch and then pulls the next batch from the ready delivery ?
  2. or queue/connection continuously fills delivery channel as soon as there is a slot available ?

It seems that prefetchLimit is implemented as 2. So even in case of prefetchLimit 1, connection has 2 unAck deliveries, one in the process of consumption and one in the delivery chain. Is there any workaround this, I want to use rmq for long running CPU tasks(varied length) and only want to pull from ready queue when one delivery consumption has finished. Support for batch prefetchLimit may be ?

Error in consumer is not fetching data from Redis queue after working for a while

Hello , and thank you everybody who is working or contributing with this great project

I'm trying to run rmq to build a distributed job load system, I have a master building jobs and right now only one agent running jobs.

The master has processed from buildID 1259 to 1270, but the agent has consumed and acked only the 1259,1260,1261,1262,1263,1266, all the other still in the queue as you can see in my "Redis DeskTop Manager" screenshot

image

I have configured the agent to fetch data for each second

	redisClient = redis.NewClient(&redis.Options{
		Network:  "tcp",
		Addr:     conf.RedisEndpoint,
		DB:       0,
		Password: conf.RedisPassword,
	})
	//check error?
	_, err := redisClient.Ping().Result()
	if err != nil {
		log.Errorf("Can no connect to redis %s", err)
		return err
	}

	con := rmq.OpenConnectionWithRedisClient("sya", redisClient)
	jobReqQ = con.OpenQueue("job_request")
	jobRespQ = con.OpenQueue("job_responses")

	jobReqQ.StartConsuming(10, time.Second)
	jobReqQ.AddConsumer("consumer-"+conf.NodeID, NewAgentConsumer())

I've restarted the agent but nothing happens, data still in queue. Data is not fetched anymore ( the producer "master" agent is stopped right now)

  • how can I get old data from the queue ?
  • why is still there and I can not get and ack?

Thank you in advance

Queue's named non-deterministically

I'm trying to send messages from a Rails app to a Go app, so the producing side needs to know the queue name. But there's this line that uses a random string.

name := fmt.Sprintf("%s-%s", tag, uniuri.NewLen(6))

Is this just not a supported use case?

Consider using BRPOPLPUSH

rmq/queue.go

Line 332 in 507bf31

value, ok := queue.redisClient.RPopLPush(queue.readyKey, queue.unackedKey)

consume now call LLEN && RPOPLPUSH to gain queue message from redis.
BRPOPLPUSH can get rid of that LLEN call, should be more efficient.

How to handle OOM error in rmq?

I got this kind of error:

rmq redis error is not nil OOM command not allowed when used memory > 'maxmemory'.
github.com/adjust/rmq.checkErr(0xfd75c0, 0xc420175670, 0x2a)

seems finished task not been released?

Should we care about closing the connection

I note from: #40 that redisConnection is deliberately not exported, but the Connection interface does not have the Close() method. There is therefore no way outside of local declaration scope to close a connection. Should I care?

Data race

I am trying to figure out what I'm doing wrong in causing a data race with redisQueue.StopConsuming()

WARNING: DATA RACE
Write at 0x00c4201960a8 by goroutine 104:
  github.com/adjust/rmq.(*redisQueue).StopConsuming()
      /Users/owner/go/src/github.com/adjust/rmq/queue.go:222 +0x9b
  main.main.func2()

Previous read at 0x00c4201960a8 by goroutine 19:
  github.com/adjust/rmq.(*redisQueue).consume()
      /Users/owner/go/src/github.com/adjust/rmq/queue.go:290 +0x68

Goroutine 104 (running) created at:
  main.main()

Goroutine 19 (running) created at:
  github.com/adjust/rmq.(*redisQueue).StartConsuming()
      /Users/owner/go/src/github.com/adjust/rmq/queue.go:213 +0x18b
  main.main()

Code:

	connection := rmq.OpenConnection("email queue", "tcp", Address, 1)
	emailQueue := connection.OpenQueue("emails")
	emailQueue.StartConsuming(100, time.Microsecond)

	consumer := &queue.EmailConsumer{}
	emailQueue.AddConsumer("consumer", consumer)

	emailQueue.StopConsuming()

Prefetch limit & poll duration

Hi, I want to use this library for push notifications so they don't vanish from memory once the service crashes. Do you think this is a sensible use-case (also considering production use)?

When setting up a small example project I figured that the prefetch limit and poll duration somehow don't align. Setting a limit of 2 and a timeout of 10 seconds still sucks up random chunks of 7-10 deliveries at once, depending how fast I add items to the queue.

func main() {
    // Setup redis and notification queue
    redis := rmq.OpenConnection("my service", "tcp", "localhost:6379", 1)
    queue := redis.OpenQueue("notifications")

    c := Consumer{}

    // Start consuming
    queue.StartConsuming(2, time.Second*10)
    queue.AddConsumer("consumer", &c)

    e := echo.New()
    e.Run(standard.New(":" + port))
}

func (c *Consumer) Consume(delivery rmq.Delivery) {
    log.Println(delivery.Payload())
    delivery.Ack()
}

You can see that they got all the same second in the timestamp:

2016/10/22 00:07:01 bla
2016/10/22 00:07:01 bla
2016/10/22 00:07:01 bla
2016/10/22 00:07:01 bla
2016/10/22 00:07:01 bla
2016/10/22 00:07:01 bla
2016/10/22 00:07:01 bla
2016/10/22 00:07:01 bla
2016/10/22 00:07:01 bla

NewClient Error

cannot use "github.com/go-redis/redis".NewClient(&"github.com/go-redis/redis".Options literal) (type *"github.com/go-redis/redis".Client) as type *"gopkg.in/redis.v3".Client in argument to rmq.OpenConnectionWithRedisClient

Why a `uniuri` for each new consumer

So I planned to use this library for some notification stuff, but each time I restart the script my client gets a new name, so the name consists of tag + 4 random characters which prevents using the same queue again after a restart.

So what's the reason behind getting each time on a new connection a new unique name?

And could we make it a optional parameter?

Redis Error on Version < 2.6.12

I don't know whether this should be reported here as I have yet dive into the mechanics of this package, but I wanted to report that due to the changed interface of the SET command in Redis since 2.6.12 (see here), OpenConnection will panic immediately with the error:

rmq redis error is not nil ERR wrong number of arguments for 'set' command

If used with any redis version below 2.6.12. A note about that in the README (or a fix if possible, prettypleasemaybe?) would be really helpful.

Cheers

PS: A first glance at the package tells me that this panic is due to the mechanics of hearbeating. I haven't fully understood the package yet, though, so take my analysis with a saltshaker worth of salt.

Batch consuming + Push

Hi,

I'm probably missing something again, please advise. Why not expose a wrapper for Delivery.Push() when it comes to batch consuming through rmq.Deliveries ? It only got Ack and Reject.

Thanks!

p.s. Just to mention, using PANIC makes the code a bit tricky to use. I am using the lib as an add-on, it should not interrupt the runtime. For now I wrapped some calls (e.g. openConnection) in defer+recover stuff, still does not look fine.

Please don't panic

Note: rmq panics on Redis connection errors. Your producers and consumers will crash if Redis goes down. Please let us know if you would see this handled differently.

My producers and consumers are part of my main API - I don't want it to be taken down because there is a redis error.

I can understand a fail fast design for separate worker services but that is a decision that is better made by the application rather than one of its libraries.

The most idiomatic go would probably be to put errors everywhere, but that would be quite a big change to your API. One option would be to coalesce errors together and put the onus on the library user to check. This would work for me.

To make this a non-breaking change we could do the following:

  • Make an option panicOnErrors with the default true, I like the following pattern for options (also non-breaking):
// 
type option func(*redisConnection)

function PanicOnError(value bool) option {
...
}

func OpenConnection(tag, network, address string, db int, options... option) *redisConnection 
  • Exploit the fact that we are meant to be running cleaner.Clean() and it returns an error and return any aggregate (or first) error there so that the library user can take whatever recover action is appropriate

Alternatively maybe it is cleaner (no pun intended) to have a separate Error() error method.

Another possibility would be to have a PanicHandler(func (error) error) so that where you would panic you instead run the provided handler function. The semantics could be that if that itself errs then you do panic. Hmm I think maybe I like that one the best - it means you can act immediately but keeps the API intact.

Support for batch publish ?

Hi,
Was wondering if there would be any support for BulkPublishBytes and the likes for other Publish method which would use the underlying Redis client's actual LPush signature as is (which is LPush(key string, values ...interface{})) ? If it's string in place of interface{}, that'd be awesome too !
If interested, I'm more than willing to help create a PR for this within the week.

How do you scale throughput?

What is the suggested approach to scale for throughput?

I am using rmq for a project which needs high throughput. If I need to increase throughput from producer -> Q -> consumer, what is the recommended approach? I am hoping for linear scalability - throw 2X more hardware, make it go 2X as fast (assuming I have a sufficient network bandwidth).

To give you an idea of the kind of throughput I hope to achieve: about 100K msgs/sec for a single Q. Overall, I'm looking at about 10M msgs/sec in the system, so I already know I'll have 100 Q's. Do I need to shard further?

  1. Add more boxes to the redis cluster. Will that make rmq go faster? This is the preferred approach since I can have a single, large Q. My application semantics prefer a single, large, fast Q.

  2. Create N redis cluster and rmq queues. Shard the Q traffic by the modulus of a hash of txn id. I think I could get linear scalability here by simply having more redis clusters.

  3. Some other approach I haven't thought of.

  4. Are there some performance benchmarks you can point me to?

Add documnetation

Hi,
I stumbled upon you lib recently its been very helpful and i am very grateful that you have put such tremendous effort into making it but i find the documentation lacking for example i don't know how to assign a key to payload and receive its appropriate response from worker using the assigned id...
This is only one of the many problems i have stumbled upon.
I would be very grateful if you could add more documentation to readme file discussing various other functions
Regards,
Sarim

how to modify the delivery use your api?

i have a task queue storing tasks, but the task may fail sometimes, i want to modify some fields before using delivery.Reject().For example incr the field retry,and when retry greater than some number, we will give up the task. Is there possible to use your api to do that?

Error [cannot use redisClient (type *"github.com/go-redis/redis/v7".Client) as type *"github.com/go-redis/redis".Client in argument to rmq.OpenConnectionWithRedisClient]

Hi to everybody here, and thank you for this project.

I'm trying to connect to my redis with password using rmq.OpenConnectionWithRedisClient()

I've working with gomodules and I've imported last rmq commit with.

go get github.com/adjust/rmq@f9bb31b720096e775f1e83f2c194bbe3409e8688

Also importing redis client as described in https://github.com/go-redis/redis

go get github.com/go-redis/redis/v7

and

import (
	"github.com/adjust/rmq"
	"github.com/go-redis/redis/v7"
)

But this error happens.

pkg/agent/rqm/rqm.go:63:43: cannot use redisClient (type *"github.com/go-redis/redis/v7".Client) as type *"github.com/go-redis/redis".Client in argument to rmq.OpenConnectionWithRedisClient

Could somebody told me what I did wrong

scaling questions

Does this work when I have multiple different machines reading from the same queue? It looks like it might not based on examination of the code

Please provide a function for pause queue receive

while I need gracefully shutdown worker, I shoud pause queue receive message first, but I found, the queue.Close is purge the queue messages, so I want rmq could provide an function for pause message receive, Thanks.

cant connect remote redis via redis url

Hi, i can't connect to my Heroku Redis

url := redis://{user}:{password}@{host}:{port}

connection := rmq.OpenConnection("", "tcp", url, 1)

got this error

2020/03/15 14:33:40 rmq redis error is not nil dial tcp: address {the url}: too many colons in address
panic: rmq redis error is not nil dial tcp: address {the url}: too many colons in address

is there any bug and how to fix this?

Thank you

Tons of connections being opened, cleaner is no help :(

I've been using rmq for a little under a week now, and unfortunately, I've started to run into some issues.

image

I'm currently sharing one connection across 4-5 queues, in the past, I've had little to no issues doing this, but as shown by the screenshot above, the single connection seems to spawning a lot more than expected, over time the connections build and cause redis to timeout. Below is a snippet of how I'm opening and sharing the connection, alongside how the cleaner is running, if you have any suggestions on how to improve this or what may cause the mass amount of clients please let me know, thanks!

var connection rmq.Connection

func OpenConnection(url string) {
	opt, _ := redis.ParseURL(url)
	opt.MaxRetries = 5
	redisConn := rmq.OpenConnectionWithRedisClient("heartbeat", redis.NewClient(opt))
	connection = redisConn
	go func() {
		cleaner := rmq.NewCleaner(redisConn)
		for range time.Tick(time.Second) {
			err := cleaner.Clean()
			if err != nil {
				sentry.CaptureException(err)
			}
		}
	}()
}

Any reason not to use go-redis/redis?

Hi,

Is there any reason for using github.com/adjust/redis instead of github.com/go-redis/redis? The native client is under constant development and many improvements have been added since

Dead connection, consumer and handler in overview page I don't need

Hello,

I'm using docker for development. If I restart docker, the previous connections, consumers and handlers are no longer in need. But in the overview page, it shows as closed. I don't think this user-friendly.

In my configuration, I only set one consumer. Each time I docker-compose down and docker-compose up -d, then there will be more one a dead connection, consumer and handler, which I don't need.

Is there any better way to handle this? Thanks.

image

Getting rmq fails: wrapper.rawClient.FlushDb undefined

Hi,
I am running Go 1.13.1 here (go version go1.13.1 darwin/amd64). When getting/downloading rmq, I get an error that wrapper.rawClient.FlushDb is not defined. I guess it's because something in the redis lib changed.

$ go get github.com/adjust/rmq
# github.com/adjust/rmq
/Users/robin/go/src/github.com/adjust/rmq/redis_wrapper.go:85:19: wrapper.rawClient.FlushDb undefined (type *redis.Client has no field or method FlushDb)

If there is anything else you need from me, just let me know.

Thanks,
Robin

Prometheus metrics

Any plans on exposing queue metrics via Prometheus endpoint? If not, I'd like to contribute.

panic on redis timeouts

Hi,

We get panics when redis timeouts. The redis is alive but under pressure. How can we raise the limit or gracefully retry ?

Attached stack

2018/07/13 15:06:06 rmq redis error is not nil write tcp 172.17.0.4:51830->someip:6379: i/o timeout
panic: rmq redis error is not nil write tcp 172.17.0.4:51830->someip:6379: i/o timeout

goroutine 40 [running]:
log.Panicf(0xa2fd38, 0x1d, 0xc4861f5e00, 0x1, 0x1)
  /usr/local/go/src/log/log.go:333 +0xda
/my/go/path/vendor/github.com/adjust/rmq.checkErr(0xa8c540, 0xc479161f40, 0x5a)
  /my/go/path/vendor/github.com/adjust/rmq/redis_wrapper.go:96 +0xcc
/my/go/path/vendor/github.com/adjust/rmq.RedisWrapper.LRem(0xc4200b27d0, 0xc4202a81e0, 0x5a, 0x1, 0xc4f95f0000, 0xafa3827, 0x28, 0x79bb15)
  /my/go/path/vendor/github.com/adjust/rmq/redis_wrapper.go:51 +0xc4
/my/go/path/vendor/github.com/adjust/rmq.(*wrapDelivery).Ack(0xc47b168e10, 0xa46ae8)
  /my/go/path/vendor/github.com/adjust/rmq/delivery.go:43 +0x60
/my/go/path/queue.(*dequeuer).Consume(0xc420190030, 0xa92600, 0xc47b168e10)
  /my/go/pathqueue/consumer.go:103 +0xf9
/my/go/path/vendor/github.com/adjust/rmq.(*redisQueue).consumerConsume(0xc4202e2000, 0xa8c220, 0xc420190030)
  /my/go/path/vendor/github.com/adjust/rmq/queue.go:326 +0x6c
created by /my/go/path/vendor/github.com/adjust/rmq.(*redisQueue).AddConsumer
  /my/go/path/vendor/github.com/adjust/rmq/queue.go:229 +0x8d

Don't panic when Redis is unavailable

Would it be possible for your functions to not panic when Redis is unavailable? Especially in goroutines that are started by functions like queue.StartConsuming. Since there's no way to recover from a panicked "child" goroutine, we are unable to handle cases when our Redis instance becomes temporarily unavailable and our consumers begin dying.

May be queue.consume() could recover from redisErrIsNil's panic and return an error to StartConsuming (through a channel)? Or better yet, redisErrIsNil doesn't panic at all and simply returns errors that your users can handle :)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.