Git Product home page Git Product logo

go-workers's Introduction

Build Status GoDoc

Sidekiq compatible background workers in golang.

  • reliable queueing for all queues using brpoplpush
  • handles retries
  • support custom middleware
  • customize concurrency per queue
  • responds to Unix signals to safely wait for jobs to finish before exiting.
  • provides stats on what jobs are currently running
  • well tested

Example usage:

package main

import (
	"github.com/jrallison/go-workers"
)

func myJob(message *workers.Msg) {
  // do something with your message
  // message.Jid()
  // message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson)
}

type myMiddleware struct{}

func (r *myMiddleware) Call(queue string, message *workers.Msg, next func() bool) (acknowledge bool) {
  // do something before each message is processed
  acknowledge = next()
  // do something after each message is processed
  return
} 

func main() {
  workers.Configure(map[string]string{
    // location of redis instance
    "server":  "localhost:6379",
    // instance of the database
    "database":  "0",
    // number of connections to keep open with redis
    "pool":    "30",
    // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
    "process": "1",
  })

  workers.Middleware.Append(&myMiddleware{})

  // pull messages from "myqueue" with concurrency of 10
  workers.Process("myqueue", myJob, 10)

  // pull messages from "myqueue2" with concurrency of 20
  workers.Process("myqueue2", myJob, 20)

  // Add a job to a queue
  workers.Enqueue("myqueue3", "Add", []int{1, 2})

  // Add a job to a queue with retry
  workers.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true})

  // stats will be available at http://localhost:8080/stats
  go workers.StatsServer(8080)

  // Blocks until process is told to exit via unix signal
  workers.Run()
}

Initial development sponsored by Customer.io

go-workers's People

Contributors

andviro avatar bdotdub avatar biasedbit avatar deet avatar dmathieu avatar dukex avatar earnaway avatar gingray avatar hamin avatar idawes avatar jakeaustwick avatar jonnii avatar jrallison avatar lxfontes avatar mattn avatar meatballhat avatar nono avatar soulou avatar wppurking avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-workers's Issues

Custom logging middleware

I'm building a Golang app that implements a Sidekiq-compatible jrallison/go-workers work queue and a custom logging wrapper around Sirupsen/logrus for marshaled JSON logs.

Now, all of my app (except for go-workers so far) uses my logger-wrapper in a central place to ensure that 100% of its output is JSON compatible.

Note that lines #1 and #2 are proper JSON from our central logger, but when go-workers initializes we see line #3 come from the wrong logger in plain text.

{"db":{"Mapper":{}},"instance_id":"1","level":"info","msg":"Db: Connected to MySQL","time":"2015-05-27T04:15:15-04:00"}
{"concurrency":10,"instance_id":"1","level":"info","msg":"Worker: Commencing work","time":"2015-05-27T04:15:15-04:00"}
workers: 2015/05/27 04:15:15.211217 processing queue contact with 10 workers.

And when we send the signal to close the program, we see first on line #1 the wrong logger in plain text, followed by the proper JSON from our central logger on line #2.

^C
workers: 2015/05/27 04:15:17.197504 quitting queue contact (waiting for 0 / 10 workers).
{"instance_id":"1","level":"info","msg":"Closed correctly","time":"2015-05-27T04:15:17-04:00"}

I cannot seem to get this custom MiddlewareLogging to replace the go-workers default logging middleware.

func (a *App) ConfigureWorkers() {
  workers.Middleware = workers.NewMiddleware(
    &WorkMiddleware{ App: a },
    )
}

type WorkMiddleware struct{
  App *App
}

func (m *WorkMiddleware) Call(queue string, message *workers.Msg, next func() bool) (acknowledge bool) {
  // before each message is processed:
  job_json := message.Args().GetIndex(0).MustString()
  job := ContactJob{}
  err := json.Unmarshal([]byte(job_json), &job)
  if err != nil {
  m.App.Log.WithFields( log.Fields{
    "job_json": job_json,
    }).Fatal("Worker: Could not Unmarshal job JSON")
    return
  }
  SetMessageJob(message, job)
  start := time.Now()
  m.App.Log.WithFields( log.Fields{
    "job_id": message.Jid(),
    "queue": queue,
    "args": message.Args(),
    }).Print("Work: Job Starting")
  defer func() {
    if e := recover(); e != nil {
      buf := make([]byte, 4096)
      buf = buf[:runtime.Stack(buf, false)]
      m.App.Log.WithFields( log.Fields{
          "job_id": message.Jid(),
          "queue": queue,
          "duration": time.Since(start),
          "error": e,
          "stack": buf,
        }).Fatal("Work: Job Failed")
    }
  }()
  acknowledge = next()
  result := GetMessageResult(message)
  m.App.Log.WithFields( log.Fields{
    "job_id": message.Jid(),
    "result": result,
    "queue": queue,
    "duration": time.Since(start),
    }).Print("Work: Job Done")
  return
}

Is it actually possible to replace the default go-workers logging middleware for those lines?

How to use enqueue_in (from ruby TO go)

Hi there,

in some examples I already saw how to use enqueue_in from go TO ruby but not the other way around.

I tried

Sidekiq::Client.push({
          "queue" => "queue123",
          "class" => "Job123",
          "args" => [1,2,3],
          "enqueued_in" => 10.seconds,
        })

But no luck :(

Stalled Jobs

I have two instances of go-workers running on a couple vps servers (1GB ram -> 25 workers) and (3GB ram -> 40 workers ) that simply go out to queued urls and does some light scraping.

They start out great but quickly stall making no more progress. I don't see any errors my logs, just the last item pulled from the queue, the processes remain running, just no more progress being made.

Any suggestions on ways to debug this?

Application server concurrency

Hello, I've run into a bit of an issue when running several application servers.

We use the worker concurrency as described in the docs (workers.Process("myqueue2", myJob, 20)), but then launch several instances of the worker application to distribute the load across multiple machines.

I'm seeing an issue where the separate worker applications are pulling down duplicate jobs. Do we need to add a lock when pulling job from queue so there isn't a race condition with other applications reading from same queue?

Retry Jobs Not Showing in Sidekiq Retry List in Web UI

Hey everyone,

I was wondering if I could get any input on this. I'm using the default retry middleware in this library and calling the panic method within my jobs when an error occurs. The jobs are being retried, but they are not being placed in the retry queue on the Sidekiq UI. I'm a bit of a Golang beginner and I've tried to take a look into the retry middleware source code to find answers without much luck. Are these jobs not being put Sidekiq's retry queue because they're simply being requeued within this library or am I just goofing something up?

I look forward to any input that you may have! 😄

Thanks!

Error & Retry

The Job func without returning a error value, so how to judge whether a job is success and should retry?
Is panic in Job func indicates failed and should retry?

Failed and Processed values on /stats never change

I noticed that the values for failed and processed never change in the stats server. It would be nice if this updated with the proper numbers.

{
    "failed": 0,
    "jobs": {
        "myqueue": [ ]
    },
    "processed": 0
}

Run jobs synchronously

Hi @jrallison ,

could we find a way to run all jobs synchronously when for example an ENV flag is set for it (useful for testing scenarios and easier debugging).

I'm guessing this should be doable via the middleware but can you help me along?

Worker sometimes taking and acknowledging jobs after Quit(), but not finishing them

Hello @jrallison,

I'm using your library for my go project and it's an awesome lib. However, I have a small problem with it. Sometimes it takes and acknowledges a new job after Quit() has been called, but the job doesn't finish. I'm not sure how to reproduce it since it only happens sometimes, in my experience, more often with some longer running tasks.

I've created a small fix for it, but I'm pretty new to both go and your lib so I'm sure you could do it in some better way.

I added global Running bool variable in workers.go (https://gist.github.com/tpiha/b73bbaadeb4ada3e6f37#file-gistfile1-txt-L20) which I'm then setting to true right in the beginning of the Run function (https://gist.github.com/tpiha/b73bbaadeb4ada3e6f37#file-gistfile1-txt-L33) and same with Quit() function.

I'm also checking that variable in middleware.go:

https://gist.github.com/tpiha/0a9ec8644ea400aac83e#file-gistfile1-txt-L27

I've fixed this for my project, it's not an issue for me, but I'm still submitting bug if you wanna do something about it in the official code.

Data race detected in Fetch()

==================
WARNING: DATA RACE
Write by goroutine 37:
  .../Godeps/_workspace/src/github.com/jrallison/go-workers/fetcher.go:90 +0x1f8

Previous read by goroutine 59:
  github.com/jrallison/go-workers.func·005()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/fetcher.go:60 +0x6b

Goroutine 37 (running) created at:
  github.com/jrallison/go-workers.(*manager).manage()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/manager.go:51 +0x2fb

Goroutine 59 (running) created at:
  github.com/jrallison/go-workers.(*fetch).Fetch()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/fetcher.go:83 +0xdf

Retry options

Hi,

I've been looking at the retry options and I've face something strange. Maybe I just misunderstood.

In https://github.com/jrallison/go-workers/blob/master/enqueue.go#L24 you can specify if you wish to retry the task, and the max number of retry.
But, in https://github.com/jrallison/go-workers/blob/master/middleware_retry.go#L58 the retry_count option is used as the actual number of time the job have been tried.
With this, we can't configure a custom retry count, and I don't really see the point of the RetryCount in EnqueueOptions.

Thanks

need more guides

thanks for your excellent work!

but I have to read the source code to learn how to use it

maybe need more wiki?

Found two bugs.

config.go:36

if options["poll_interval"] != "" {

Unequal is wrong. It should be equal.

options["poll_interval"] == ""

scheduled.go:55

s.poll(true)

That's a recursive fun call. It will cause stack too big and over limit.

Go complains about the return paths when you return from an if else

github.com/jrallison/go-workers

/usr/lib/go/src/pkg/github.com/jrallison/go-workers/msg.go:25: function ends without a return statement
/usr/lib/go/src/pkg/github.com/jrallison/go-workers/msg.go:53: function ends without a return statement
/usr/lib/go/src/pkg/github.com/jrallison/go-workers/msg.go:61: function ends without a return statement

Maybe return nil before the final curly brace

gospec moved to github.com/customerio/gospec/src/gospec

go get github.com/customerio/gospec fails on Travis-CI since it doesn’t contain any buildable Go source files. The (pretty awkward) location of the package is github.com/customerio/gospec/src/gospec

Sidenote: Why is this unmaintained dependency needed?

queue is not FIFO, but LIFO actually..

Version of go-workers

v0.0.0-20180112190529-dbf81d0b75bb

Details

  • line.86 in workers.fetcher.go
  • line.81 in workers.enqueue.go

image

My concern

I don't know the spec of Sidekiq, but I suspect we should use lpush instead of rpush to be FIFO.
What do you think Mr. @jrallison ?

It looks Sidekiq is actually FIFO sidekiq/sidekiq#3420 .

EnqueueIn method error

I use golang to set up a sidekiq client and a ruby sidekiq worker,

I felt very good when use Enqueue method just like this:

 workers.Enqueue("example", "SomeWorker", "some_string")

But it will get some error when i use EnqueueIn method for some delayed job,
like this:

 workers.EnqueueIn("example", "SomeWorker", 10, "some_string")
error: 
  Message args must be an Array
  /.rvm/gems/ruby-2.2.2/gems/sidekiq-3.2.6/lib/sidekiq/client.rb:212:in `normalize_item'

would i doing something wrong?

sorry to my pool english!

Multiple servers pulling same JID from queue

After I have enqueued multiple jobs, I am running multiple servers to pull jobs from the queue and process them. Occasionally, the same JID is pulled by two servers which I had assumed could not happen. I am following the simple example from the README:

func main() {

    runtime.GOMAXPROCS(runtime.NumCPU())

    workers.Process("LogParser", parseLog, 1)
    workers.Run()

}

The workers are configured in the init() specifying the server, database, namespace, pool and process.

Is there additional configuration/code to ensure this does not happen?

Make Config.namespace public

I am trying to write some custom middleware that needs access to the namespace. Please make Config.namespace public so that it is accessible in custom middleware.

question: support for go modules

Hey! First of all, thanks for this amazing package. Have used it a lot at my job and on personal projects and runs awesome.

My question is if support for go modules will be available any time soon.

Cheers!

Pass Redis connection Configure

I'm having difficulties connecting to SSL-enabled Redis instances because on the Configure() function the Redis client is instantiated without a way to pass Redis Connection options:

	Config = &config{
		options["process"],
		namespace,
		pollInterval,
		&redis.Pool{
			MaxIdle:     poolSize,
			IdleTimeout: 240 * time.Second,
			Dial: func() (redis.Conn, error) {
				c, err := redis.Dial("tcp", options["server"])
...

Instead, the Configure() method should accept a redis.Conn which can be setup outside go-workers. This way is would be the most flexible und de-coupled way to setup customized Redis connection.

Is anyone here still managing this Repo and accepting Pull requests?

Use Redis URI scheme with auth to address the server

It would be nice if go-workers could be configured via a redis-uri, in addition to the current way of server address, port.

This would be useful in Heroku where the Redis addons set a redis URI in the environment.

Thanks for this excellent package.

For example: redis://username:[email protected]:6389

workers.jobFunc type errors

Trying out the example code, I'm getting this weird error -

cannot use myJob (type func(*workers.Msg)) as type workers.jobFunc in function argument

which doesn't make any sense, as job.go defines

type jobFunc func(message *Msg)

Any ideas? I'm at a loss as to what could be causing this

Excessive file descriptors?

Hi,

When I'm running go-workers I have constantly growing # file descriptors. I don't think I'm manually opening fd's without closing them, so I wonder if there's any clean up work needed for go-workers to close file descriptors? (e.g. redis connections, etc.)

Thanks!

Data race in quitManagers()

WARNING: DATA RACE
Read by goroutine 770:
  github.com/jrallison/go-workers.(*manager).processing()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/manager.go:73 +0xb0
  github.com/jrallison/go-workers.(*manager).quit()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/manager.go:33 +0xab
  github.com/jrallison/go-workers.func·013()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/workers.go:66 +0x34

Previous write by goroutine 20:
  github.com/jrallison/go-workers.(*manager).loadWorkers()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/manager.go:66 +0x118
  github.com/jrallison/go-workers.(*manager).manage()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/manager.go:49 +0x2a6

Goroutine 770 (running) created at:
  github.com/jrallison/go-workers.quitManagers()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/workers.go:66 +0xbf
  github.com/jrallison/go-workers.Quit()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/workers.go:43 +0x2b
  github.com/jrallison/go-workers.handleSignals()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/signals_posix.go:18 +0x2c1

Goroutine 20 (running) created at:
  github.com/jrallison/go-workers.(*manager).start()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/manager.go:23 +0x6b
  github.com/jrallison/go-workers.startManagers()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/workers.go:60 +0xb6
  github.com/jrallison/go-workers.Start()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/workers.go:39 +0x4d
  github.com/jrallison/go-workers.Run()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/workers.go:32 +0x2b

Support for sidekiq/redis namespaces

As per the sidekiq configs it's recommended to set a namespace on your queue:

https://github.com/mperham/sidekiq/wiki/Advanced-Options

Sidekiq.configure_server do |config|
  config.redis = { :url => 'redis://redis.example.com:7372/12', :namespace => 'mynamespace' }
end

I spent about 3 hours yesterday trying to work out why my go-workers weren't getting called in production... so adding support for namespaces is something that should help people who have existing sidekiq workers.

Sidekiq uses redis-namespace to namespace all the keys that is stores/reads in redis. I believe the commands used in go-workers is:

brpoplpush
lrem
lrange
zrangebyscore
zrem
lpush

Namespacing these is pretty simple, you can just append namespace: to the first argument, with the exception of zrangebyscore which requires all argument except the last to be namespaced.

I'm pretty new to go and want to give this a try. Does the above sound reasonable? Do you have any advice as to how to proceed?

Data race detected in waitForExit()

WARNING: DATA RACE
Write by goroutine 24:
  runtime.mapdelete()
      /usr/local/go/src/pkg/runtime/hashmap.goc:948 +0x0
  github.com/jrallison/go-workers.waitForExit()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/workers.go:73 +0x13f
  github.com/jrallison/go-workers.Quit()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/workers.go:45 +0x6c
  github.com/jrallison/go-workers.handleSignals()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/signals_posix.go:18 +0x2c1

Previous read by goroutine 15:
  runtime.mapiterinit()
      /usr/local/go/src/pkg/runtime/hashmap.goc:1011 +0x0
  github.com/jrallison/go-workers.waitForExit()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/workers.go:71 +0x72
  github.com/jrallison/go-workers.Run()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/workers.go:34 +0x3f

Goroutine 24 (running) created at:
  github.com/jrallison/go-workers.Run()
      .../Godeps/_workspace/src/github.com/jrallison/go-workers/workers.go:33 +0x39

Scheduled jobs being "lost"

Hi there,

I'm using your package (with revision 287ed37)

I'm encountering some worrisome behavior (both in development as production).
When I'm quickly enqueuing scheduled jobs using worker.EnqueueIn(...) for some jobs (let's say 1 in 100) I do not get an error but a nice hash string of the created job ID. However the job is nowhere to be found. Not in "scheduled" and never arrives to "queued".

Did you already encounter this behavior? I'm only noticing it with scheduled jobs, not with directly enqueued jobs using worker.Enqueue(...)

Would be great to help me get this stability bug out of the code.

Contributors

@jrallison you've done an amazing job with this project. We rely on it a lot for our infrastructure and it works amazingly. We do around 20,000 jobs per minute on very commodity hardware which is so cool.

Looking at this project it seems a bit stale, the most recent merged PR's and commits are from 2016. Would you be willing to add some collaborators so we can kick some life into this project?

ActiveJob Issues

We pass Sidekiq jobs through go-workers to our Rails app. Which works fine for vanilla Sidekiq workers. However when we pair Sidekiq with ActiveJob we get the following error on the Rails app.

WARN: NoMethodError: undefined method jid=' for #<FooBarJob:0x00007fc656cc45d8>

Has anyone else had similar issues passing jobs through to ActiveJob at all?

Scheduled jobs

Hello,

I'm looking to implement scheduled jobs to get something like:

workers.Enqueue(queue, class, map[string]{"at": time.Now().Unix(), "arg1": "value1"})

I tried with a middleware:

func (r *AtMiddleware) Call(queue string, message *workers.Msg, next func() bool) (acknowledge bool) {
    at := message.Args().Get("at")
    atTimestamp, err := at.Int64()
    if err != nil {
        acknowledge = next()
        return
    }

    if time.Now().Unix() > atTimestamp {
        acknowledge = next()
    } else {
        class := message.Json.Get("class").MustString()
        args := message.Args().MustMap()
        time.Sleep(15 * time.Second)
        workers.Enqueue(queue, class, args)
    }
    return
}

What I really don't like is the fact that I need to recreate a job, and that I need to wait, because the worker is polling far too often and hundreds of jobs are created each second, wich is not what I want.

How would you implement that ?

EnqueueIn and EnqueueWithOptions with At

Can somebody please explain to me how these functions work for scheduling a job?

Sorry if this doesn't qualify as issue, but there are no steps describing this kind of operations.

I have a list of job handlers that implement jobFunc, so initially I just enqueue different tasks to run periodically:

// Start jobs 
jobs := buildJobsList(c.String("jobs"))
if len(jobs) == 0 {
	jobs = Jobs
}
for _, jobName := range jobs {
	var job Consumer
	var params interface{}
	var operation string
	switch jobName {
	case "amazon-inventory":
		job = NewAmazonInventory()
		consumers["list-inventory-supply-by-next-token"] = job
	}
	if err := job.Initialize(errors); err != nil {
		return err
	}
	consumers[operation] = job
	workers.Enqueue(redisConfig.Queue, operation, params)
}
workers.Process(redisConfig.Queue, handleJob, redisConfig.Concurrency)
go func() {
	for err := range errors {
		message.Error("an error occurred: %v", err)
	}
}()
go workers.StatsServer(8080)
workers.Run()

The handleJob function just call the Handle function in the Job interface with the corresponding params:

// handle job function
args, err := msg.Args().Map()
if err != nil {
	message.Error(err.Error())
}
operation := args["Operation"].(string)
job := consumers[operation]
job.Handle(msg)

Now, different jobs need to run periodically at different times, so I need to schedule the next function at somewhere in the future, it might be a couple of seconds, five minutes, etc. For instance, the job configured above could have this implementation in the Handle function:

params := &AmazonInventoryParams{}
if err := json.Unmarshal([]byte(msg.Args().ToJson()), &params); err != nil {
	j.errors <- err
	return
}
nextPerform := time.Now().UTC().Add(time.Second * 2)
workers.EnqueueAt(redisConfig.Queue, params.Operation, nextPerform, params)

The problem is that I don't know exactly how to schedule the next perform time either using EnqueueAt or EnqueueIn functions.

This is the output after running the code above, and as you guys can see, the jobs are scheduled to run after fifteen seconds:

workers: 2018/03/20 18:24:40.093782 processing queue osom with 10 workers.
workers: 2018/03/20 18:24:40.105344 osom JID-e00d508285d6241a2173fa79 start
workers: 2018/03/20 18:24:40.105413 osom JID-e00d508285d6241a2173fa79 args: {"MarketplaceID":"A1AM78C64UM0Y8","NextToken":"","Operation":"list-inventory-supply","QueryStartDateTime":"2018-03-21T00:24:40.086486644Z"}
workers: 2018/03/20 18:24:40.106765 osom JID-e00d508285d6241a2173fa79 done: 1.417078ms
workers: 2018/03/20 18:24:55.106479 osom JID-37e2b86789ba8734ab3073bc start
workers: 2018/03/20 18:24:55.106531 osom JID-37e2b86789ba8734ab3073bc args: {"MarketplaceID":"A1AM78C64UM0Y8","NextToken":"","Operation":"list-inventory-supply","QueryStartDateTime":"2018-03-21T00:24:40.086486644Z"}
workers: 2018/03/20 18:24:55.113703 osom JID-37e2b86789ba8734ab3073bc done: 7.221384ms
workers: 2018/03/20 18:25:10.114261 osom JID-b1b8c1d52badaad8ede83e0e start
workers: 2018/03/20 18:25:10.114299 osom JID-b1b8c1d52badaad8ede83e0e args: {"MarketplaceID":"A1AM78C64UM0Y8","NextToken":"","Operation":"list-inventory-supply","QueryStartDateTime":"2018-03-21T00:24:40.086486644Z"}
workers: 2018/03/20 18:25:10.116502 osom JID-b1b8c1d52badaad8ede83e0e done: 2.238734ms
^Cworkers: 2018/03/20 18:25:21.669964 quitting queue osom (waiting for 0 / 10 workers).

I already tried scheduling the jobs using the EnqueueIn function using something like float64((time.Seconds * 2).Nanoseconds()), but I think EnqueueAt would be more convenient and easy to figure out.

Thanks in advance and I think it would be really nice if we can have a Wiki describing this kind of scenarios.

Doesn't include a License file

Hey John,

Thanks for the your work on this Library. Please can you choose an appropriate license model and include a License file in this repository informing users about the chosen license.

Thanks John!

Example of not acknowledging a message?

There appears to be an Acknowledge() function on the Fetcher and some acknowledgement options on middleware, but not on "myJob".

If my job can't be processed yet and I would like to leave it on the inProgressQueue, how do I do that?

It's also not clear to me when those inProgress jobs are retried? Is that dependent on retry options sent from the (Ruby client)? Does it use the same backoff algorithm as the Sidekiq server?

Enqueue using a Redis pipeline?

I have a requirement to enqueue a large number of jobs in a short period of time. I think it probably makes sense to try to do so using a Redis pipeline, but it doesn't appear that EnqueueWithOptions is all that well adaptable to adding pipeline support as currently written.

Do you have any thoughts on how to add support for that?

Can't enqueue jobs

I'm writing a software that has three components: "enqueuer", "queue" and "qworkers":

  src/
    application/
      qworkers.go (app package)
    enqueuer/
      enqueuer.go (main package)
    queue/
      queue.go (main package)

Application is where my jobs are living, exported as app.MyJob.

Queue, is based on README example, but it doesn't enqueue within main(). This generates an executable which is supposed to run forever.

Enqueuer is responsible to enqueue a single app.MyJob for each database record that needs to be processed. This component should be scheduled with Cron, execute its task and quit.

Now the problem is: when I enqueue like your README's example, it works, when I do from the outside (Enqueuer component, not Queue) it generates an error like this:

panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x20 pc=0x4024d54]

goroutine 1 [running]:
github.com/jrallison/go-workers.Enqueue(0x42d4b00, 0x5, 0x42cc940, 0x3, 0x41eee60, ...)
    /Users/luca/Code/app/src/github.com/jrallison/go-workers/enqueue.go:13 +0x44
main.enqueueFeed(0x1, 0x0, 0x0)
    /Users/luca/Code/app/src/enqueuer/enqueuer.go:23 +0xba
main.enqueueFeeds(0xc2000e0120)
    /Users/luca/Code/app/src/enqueuer/enqueuer.go:18 +0x98
main.main()
    /Users/luca/Code/app/src/enqueuer/enqueuer.go:13 +0x4d
func enqueueRecord(record app.Record) {
  workers.Enqueue("queue1", "Add", []int{record.Id}) /* Line 23 */
}

What's wrong with my code? What's the meaning of "Add" as second argument?

Monitoring worker

Is there a way to monitor the worker in runtime? What I'm looking for is to be able to know if something weird is going on. I'm not sure if I know what I'm looking for.

I've a cron job which gets data from someone else's API, and stores in db. Normally that takes around 5 seconds, but every now and then the API takes forever to load, is there a handy way to know in real time, how long this is taking? So I can also log that in real time?

The reason I'm asking is because I'm not sure if I created something that's totally doable using this package.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.