Git Product home page Git Product logo

gue's People

Contributors

aeijdenberg avatar aohorodnyk avatar aravindc26 avatar avries avatar bgentry avatar del-sama avatar dependabot[bot] avatar janmentzel avatar jbowes avatar johto avatar karan-balu avatar omh avatar tie avatar titanous avatar vgarvardt avatar zapo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

gue's Issues

Worker failed to lock a job

I encountered a problem, why? Thank you for you answern.
Details:
[gue-logger.go:26::github.com/ljg-cqu/core/logger/adapter.(*gueAdapter).Debug()] Feb 23 19:09:36.946 [D] Tried to enqueue a job
[gue-logger.go:42::github.com/ljg-cqu/core/logger/adapter.(*gueAdapter).Error()] Feb 23 19:09:36.946 [E] Worker failed to lock a job
......
[gue-logger.go:42::github.com/ljg-cqu/core/logger/adapter.(*gueAdapter).Error()] Feb 23 19:09:46.954 [E] Worker failed to lock a job
[gue-logger.go:34::github.com/ljg-cqu/core/logger/adapter.(*gueAdapter).Info()] Feb 23 19:10:26.946 [I] Worker finished
[gue-logger.go:34::github.com/ljg-cqu/core/logger/adapter.(*gueAdapter).Info()] Feb 23 19:10:26.946 [I] Worker pool finished

Trying to install 4.1.0 fails due to otel deps

Apparently otel removed some of their previously-working import paths (open-telemetry/opentelemetry-go#2897), so we might need to bump their versions in the v4 branch.

$ go get -u github.com/vgarvardt/gue/v4
go: github.com/vgarvardt/gue/v4 imports
	go.opentelemetry.io/otel/metric/instrument: cannot find module providing package go.opentelemetry.io/otel/metric/instrument
go: github.com/vgarvardt/gue/v4 imports
	go.opentelemetry.io/otel/metric/instrument/syncint64: cannot find module providing package go.opentelemetry.io/otel/metric/instrument/syncint64
go: github.com/vgarvardt/gue/v4 imports
	go.opentelemetry.io/otel/metric/unit: cannot find module providing package go.opentelemetry.io/otel/metric/unit

Add maximum retry

Is there a way to limit the called of worker function if those worker function its always return an err ?
something like how much retry operation that its need

v5.1.0 error: cannot use unit.Dimensionless

After updating from v5.0.1 to v5.1.0 I have the following error:

# github.com/vgarvardt/gue/v5
../../../../go/pkg/mod/github.com/vgarvardt/gue/[email protected]/client.go:218:23: cannot use unit.Dimensionless (constant "1" of type unit.Unit) as string value in argument to instrument.WithUnit
../../../../go/pkg/mod/github.com/vgarvardt/gue/[email protected]/client.go:226:23: cannot use unit.Dimensionless (constant "1" of type unit.Unit) as string value in argument to instrument.WithUnit
../../../../go/pkg/mod/github.com/vgarvardt/gue/[email protected]/worker.go:276:23: cannot use unit.Dimensionless (constant "1" of type unit.Unit) as string value in argument to instrument.WithUnit
../../../../go/pkg/mod/github.com/vgarvardt/gue/[email protected]/worker.go:284:23: cannot use unit.Milliseconds (constant "ms" of type unit.Unit) as string value in argument to instrument.WithUnit

Also these packages affect this error

before (no errors)

go.opentelemetry.io/otel v1.13.0 // indirect
go.opentelemetry.io/otel/metric v0.36.0 // indirect
go.opentelemetry.io/otel/trace v1.13.0 // indirect

after (there is an error)

go.opentelemetry.io/otel v1.14.0 // indirect
go.opentelemetry.io/otel/metric v0.37.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect

Keeping finished jobs in the database

Hi,
first of all thanks for this project, it's exactly what I was looking for.

I'm currently playing around with it and saw that finished jobs are removed afterwards from the table. Have you thought of adding an option to keep finished jobs in the database to get some sort of log?

robin

Manually rescheduling a job

I am trying to mutate the job.RunAt field inside a job to have it try to run itself at a later time. I want to control when to do this myself in a job and not as s policy.

Currently when I do this it works but i get the error
2022/11/09 11:26:42 Got an error on deleting a job level=error worker-pool-id=8d9cd5 worker-id=8d9cd5/worker-1 job-id=3 job-type=ChallengeFinisher error=conn busy

I have a wrapper around a job where i check that if this jobs depdendentIds are not finished running it should wait for a given interval and try again. I would very much like to keep the id of this job as it is and not report this as an error.

Support other binary encoding for storing the Args

Currently the library forces a Job's Args to be encoded as valid JSON by storing it in JSON field in the database table and casting it to json.RawMessage in

gue/client.go

Line 100 in 667e4cb

`, j.Queue, j.Priority, j.RunAt, j.Type, json.RawMessage(j.Args), now).Scan(&j.ID)

But in my case I would like to transport the Job args as a Protobuf message (for forward compatibility). The database field is easy to change of course because I run the migration myself. But I currently can't change the json.RawMessage cast.

It would make this project more flexible if developers could bring their own encoding format. Maybe just as option on the Client with a default set to using JSON?

let me know if this would be interesting to you an I can probably setup a PR and discuss it a bit further.

Adding a subQueue or workIdentifier field

Would it be possible to add a new string field to the job table that specifies text field that identifies that this particular job is of a particular run/batch of said job? The use case here is around me checking for dependent items in queue.

  • I cannot use Type as that is used to map to a function
  • I cannot use Queue as that is used to run seperate workerPools
  • I could use Args and run a Json query but not sure how efficient that is

if we had a field job_batch_id (tentative name) that could optionally be added to a job checking this would be very easy. I could basically as part of the wrapper I have to check if a job should run check if there are any active jobs with a given subQueue and if it is reschedule the job to run later.

Also then I could say that if a job Enques other jobs it can use the same subQueue as parent or atleast use that as a prefix.

I know this is pushing the boundaries of the project a little but it is just a single field added from the perspective of this project.

Btw do you have a patreon set up here? You are doing amazing work and should be rewarded.

Add context to worker

I am researching que systems and this looks really promising but I was wondering how can i add context to a given worker, at minimum i need to get ids numeric id, but beeing able to provide it with aditional context would be nice aswell.

Can't run `go mod tidy` after installing

It seems like pgx dependency points to a no longer available module.

github.com/vgarvardt/gue/v2 imports
        github.com/jackc/pgtype tested by
        github.com/jackc/pgtype.test imports
        github.com/jackc/pgx/v4 tested by
        github.com/jackc/pgx/v4.test imports
        github.com/jackc/pgtype/ext/satori-uuid: module github.com/jackc/pgtype@latest found (v1.6.1), but does not contain package github.com/jackc/pgtype/ext/satori-uuid

I was able to fix on my working copy by bumping pgx to latest.

-       github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186
+       github.com/jackc/pgx/v4 v4.9.0

Let me know if you would like a PR for that. Tests still pass but I didn't test the functionality (since I'm not using that adapter).

Panic error messages only logged in database

I've noticed that panic error messages are not outputted/logged and can only be checked in the database last_error column. I've tried debugging the code when panic is produced and the logger is indeed set to NoOpLogger in the recovery code scope:
Screenshot 2022-11-13 at 19 23 42

Passing both StdLogger or zap.Logger when creating a gue client didn't help. Example of how I'm setting the custom logger:

gc, err := gue.NewClient(poolAdapter, gue.WithClientLogger(adapter.NewStdLogger()))

Is it the expected behavior and I've misunderstood how custom loggers should work or is it a bug?

I'm using v4 gue version: github.com/vgarvardt/gue/v4 v4.1.0

Database migration with an API and a worker

Let's suppose that I have an API and a worker both using gue, what approach do you recommend for migrating the database?

Some ways that I had been though:

  1. The API migrate all models.
  2. The API and the worker try to migrate on initialization.

For the 1st, I am worried about the execution order.
For the 2nd, I am thinking about the shared code to enable this.

`HooksJobDone` are not executed within the transaction in case of a `WorkFunc` failure.

In case a worker runs a WorkFunc that returns an error, hooksJobDone will not run within the same transaction because calling op.Error() defers a call to op.Done() which commits the transaction, see line 232 in worker.go.

gue/worker.go

Line 232 in 859d3f2

if jErr := j.Error(ctx, err.Error()); jErr != nil {

One solution would be to remove the defer job.Done() from job.Error(), since worker.go already defers j.Done() :

  1. it will ensure the transaction is committed
  2. hooks that use the transaction will properly execute.

A Second backwards-compatible solution would be to add a alwaysDone flag to Job type which will conditionally run the defer block in

gue/job.go

Line 133 in 859d3f2

defer func() {
.

The example hook in the readme will break if the WorkerFunc returns an error.

Enque new jobs from within a job

A pattern I find myself doing is to have one job that is done at a given interval that creates lots of other jobs. Right now I am doing this via the gueClient but was wonder if there is a way to do this from a job so that it can run in the same transaction.

GRPC interface

Hi, what do you think of a grpc interface? If I would build it would you accept a pull request?

More production ready example

It is possible to create an example that is more production ready (the one in the readme has some comments) or point to a repo that has this?

We are strongly considering using this framework as the work horse of our small workflow engine but would really love some more exhaustive examples.

Question: Semantics on cancel of workerpool

I am trying to implement logic where when i cancel the context a workerPool run in the active jobs are allowed to finish, but not accept new jobs. Is this a pattern that gue supports?

I tried to experiment a little but i did not find an obvious way to do this.

stacktrace truncate

gue/worker.go

Lines 284 to 291 in c9efae2

stackBuf := make([]byte, 1024)
n := runtime.Stack(stackBuf, false)
buf := new(bytes.Buffer)
_, printRErr := fmt.Fprintf(buf, "%v\n", r)
_, printStackErr := fmt.Fprintln(buf, string(stackBuf[:n]))
_, printEllipsisErr := fmt.Fprintln(buf, "[...]")
stacktrace := buf.String()

Maybe a silly question - but why truncate the stacktrace?

Aside from writing bug-free code 🤣 is there a better way to extract errors than relying on the built in last_error field? This often removes the useful bit when there's tracing and other things wrapping the job.

panic recovery is broken

defer recoverPanic(ctx, ll, j) in worker.go:206 doesn't log the panic and populates lastError with an empty string, even tho it increases the errerCount value

I'm using the zap and pq adapter and

I spend some time looking into the issue but I couldn't find the reason

Way to distinguish between a 'successful' versus 'failed' job

The scenario I'm running into is this

  • Spawn job to check if a transaction is valid, it may either be successful, failed, or not confirmed yet.
  • If its not confirmed, throw an error, so we can retry again later
  • If its failed or successful, return nil to not retry, but then on a JobDone callback i want to distinguish between the successful or failed transaction

Is there a way to implement this behaviour in the library, or would I need to build something around the library instead?

Jobs are not being precessed immediately

Hi there,
I have this workers pool set

	workers, err := gue.NewWorkerPool(
		gc,
		wm,
		10,
		gue.WithPoolQueue(printerQueue),
		gue.WithPoolPollInterval(time.Minute*5),
		gue.WithPoolLogger(azap.New(zap.NewExample())),
		gue.WithPoolPollStrategy(gue.PriorityPollStrategy),
		gue.WithPoolHooksJobDone(finishedJobsLog),
	)

My impression was that gue will process any added messages right away, based on priority, but it looks like it's waiting for 5 minutes poll interval

Is this an expected behavior?

My use case is that I want to process newly added jobs right away and make them retry in 5 min if they fail.

Thanks

Real-time Jobs

Sorry to bother you again @vgarvardt

We were using an internal library before moving to gue. (We have been using both until recently we removed the old one)
On that library we had a functionality that might be ported to gue and might be interesting.

Basically gue works polling jobs from the DB using an interval.
If you have many jobs types you have to basically find a balance for setting that interval, that is not polling too much, but you do not add too much delay processing important jobs.

The idea would be to:

  1. Mark jobs (when they get enqueued) as real-time.
  2. Have a trigger in the DB that when a job gets inserted use the postgresql NOTIFY [channel] functionality.
  3. GUE has a worker awaiting using the: LISTEN [channel] functionality.
  4. When a new message is received, triggers the timer so it wakes up the pool associated to the job.Type. (or something like this. we would need to check if they are currently running as a fail-safe)
  5. Workers will run as usual picking up that task.

PS: I've read your comment about the plugin system. But this would need to be CORE I believe to access the timer, workmap, etc.

Let me know what you think, thank you.

Exampe outbox is not workering

Hello,

I tried the example merged in #98 but i get an error when i run the client task. After starting the client and putting messages in the db i run the task client comamnd

I get the following error:

 task worker
task: [deps-up] docker compose up --detach --wait
[+] Running 2/2
 ⠿ Container outbox-worker-kafka-redpanda-1  Healthy                                                                                                                                      0.5s
 ⠿ Container outbox-worker-kafka-postgres-1  Healthy                                                                                                                                      0.5s
task: [_start_worker] go run ./... worker
Error: could not create test topic: dial tcp [::1]:9092: connect: connection refused

Looking in docker logs i see
outbox-worker-kafka-postgres-1 | 2022-10-17 12:17:08.136 UTC [76] FATAL: role "test" does not exist

Specific workers per job type(s)

Hi, thank you for this awesome library.
We have been using it for a while now, but we have new requirement in our projects that I am not sure we can implement with the current functionality.

Basically we have several services running, each one has its own job types and they all run in parallel in the same DB.
But now we have a new service that has to have several job types, but the same queue type. And the job types are of different nature, some tasks are really fast <20ms others really slow >2 minutes.

Problem A

We need to have many workers for the quick tasks, and only a couple of workers for the slow task.
Right now you can only specify num workers per pool/queue.

Problem B (the biggest one)

The quick tasks must be processed in near real-time fashion. The slow tasks can wait.
If I create a workmap for these queue, and we have many slow tasks, the workers get stuck doing slow tasks, and the quick tasks are not resolved until all the slow tasks created before are processed.
(this is when tasks fail, they are mainly priorized by run_at, and the priority is ignored... unfortunately we have a lot of failures due to unreliable 3rd party services)
If we separate the tasks in different workmaps or pools of workers, because they share the same queue, we get a lot of job with unknown type as both workers try to pull and lock job types of the other type. (even if we handle the unknown type, we do not want for those tasks to be locked constantly until the right pool picks them up)
This is also a problem if you have more job types in your queue than what your service understands. As you could have several services listening the same queue with workers/handlers for different job types. Even if you can ignore them, right now it fetches all jobs of any type, which is not ideal / optimal.

Solutions?

So, first of all, is there a way to achieve this with the current v5 version? (sorry if we have missed it)

If it is not possible, would you be ok for us to create a pull request with the following changes:

  • Create a WorkerOption WithWorkerJobTypes(types ...string) and store these in the worker
  • When the query is generated, if the there are job types defined, add it to the WHERE statement of the query: job_type = ANY(types)
  • Add the queue_type column to the index in the sample migration. (or give both options)

I believe with thes changes:

  • You can have several services (or pools) listening only to the job types that it understand. In other words: you can share queues among services, without conflicts or query collisions.
  • We can create different pools with different number of workers using the same queue, and only get the job_types intended.
  • Gives a simpler way to manage tasks without having to configure an unknown job type handler.

Let us know how we can proceed.
Thank you!

prevent enqueuing same job,

Any idea how to prevent enqueuing same job?

Maybe like if a job with same args exists More then 3 then we can prevent enqueuing it again?

Request for new release

I would like to use the changes commited in this PR in my Go application. It would be very nice if a new release can be made. When is the next release scheduled?

Undesirable trace spans from poll function

We're using gue together with Gorm, and specifically the otelgorm library which provides otel spans when Gorm executes a SQL command.

The issue that we see is that because the poll function occurs before the first span is emitted from gue, the span that ends up being emitted by the "poll" gorm call becomes its own root span without any context about gue. For us, it would be desirable if the gue span includes the the poll query so that the poll is part of the worker span.

Of course, the downside of this switch is that any users of gue would start getting job spans for every poll, regardless of if the poll found a job. I could see how this could be undesirable.

Aside from an IncludePollInTrace option, not sure if there is a "good" solution to this.

Hooks are not executing for jobs that panic

Hi, I want to persist the start and end of a job in the same database row in a history table. For this I am using the hooks and can record the end time of jobs that return a normal error just fine.

Having jobs that panic does not work as expected, from the best I can tell the hooks registered with WithPoolHooksJobDone are not called at all in this case.

Version 5.5.0
Hooks registered via WithPoolHooksJobLocked are executed though.

Feature Request: Support for dependent jobs

Have you considered supporting dependent jobs? I have not digged deep into the scheduler code of this project yet, but since there are configuration options to not run until a given time or run again on error I should belive it would be possible to add an option to

  • check if a list of job_ids are fulfilled
  • check that there are no pending jobs of the given job type
  • check that there are no pending jobs in the given queue

My main use case for this is creating a workflow like this:

Job V1: Read a validate config.
Job P1-P3: In paralell run 3 different tasks
Job G1: Wait until P1-3 are done, generate a job list of hundres of jobs R
Job R1-Rx: Run all the jobs
Job P4: Wait until all R are done. Run another job
Job G2: Wait until P4 is done. Generate yet more hundres of jobs of a different type S
Job S1-Sx: Run all the hundres of jobs
Job P5: wait until all S are done: Run a final job

In this case the queue for P, R, S is the same but the job type is different. Infact the job type for each P is different. (This queue runs transactions and need identity on the worker) the queue for G,V is the same and they only generate items do not do blockchain work.

All this is possible to just handle without support from the framework but it would results in lots of errors since for each time you want to schedule a job that is yet to run you have to Return an error, or I guess adjust it to check again in a couple of minutes. I approximately know how long each task takes here.

Postgres Listen/Notify instead of polling

First of all great work. Works as expected with tonnes of options but I will just point out the obvious that came to my mind.

While the poll works great, Postgres's Listen/Notify interface offers more significant benefits. And in the Elixir world, there's a popular job processing package leveraging this to be much more scalable.

https://github.com/sorentwo/oban

Interestingly that project also offers paid licenses for more plugins and web UI. This package actually falls back to polling if for some reason pub/sub stops working.

Understandingly this is an open-source project but please consider this as an eventual path forward. I am open to collaborating on this.

https://getoban.pro/articles/one-million-jobs-a-minute-with-oban

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.