oze4 / workerpoolxt Goto Github PK
View Code? Open in Web Editor NEWConcurrency limiting goroutine pool without upper limit on queue length. Extends github.com/gammazero/workerpool
License: MIT License
Concurrency limiting goroutine pool without upper limit on queue length. Extends github.com/gammazero/workerpool
License: MIT License
Hi,
Im searching for something similar to this lib, and I saw that it fairly new.
I'll plan to test it in the coming days, however I notice that there is missing Unit test of some tests. do you plan to add?
Thanks,
David
Hi,
As discuses I open a new question with example of my code.
I need to extend the current implementation with my clients, Im doing that like following:
this works, but am I doing it right ? is there a better/cleaner approach to add those clients / interface in go... ?
to gain some confidence, i'll appreciate your example also.
Thanks in advance!
// Inject worker client
type Client struct {
HTTP http.Client
Kubernetes kubernetes.Clientset
}
type WorkerClients struct {
client *Client
wp *workerpoolxt.WorkerPoolXT
}
func main() {
wp := workerpoolxt.New(10, time.Duration(time.Second*10))
aa := WorkerClients{
client: &Client{
HTTP: HTTPClient(),
Kubernetes: AnotherClient(),
},
wp: wp,
}
aa.wp.SubmitXT(
workerpoolxt.Job{ // For demo purposes, this job will timeout
Name: "Job1",
Timeout: time.Second * 5,
Task: func() workerpoolxt.Response {
res, e := aa.client.HTTP.Get("http://www.google.com")
if e != nil {
return workerpoolxt.Response{Error: e}
}
return workerpoolxt.Response{Data: res.StatusCode}
},
})
wp.SubmitXT(
workerpoolxt.Job{ // For demo purposes, this job will timeout
Name: "Job2",
Timeout: time.Second * 1,
Task: func() workerpoolxt.Response {
time.Sleep(time.Second * 2)
return workerpoolxt.Response{Data: "Hello"}
},
})
// Submit as many jobs as you would like
results := wp.StopWaitXT()
HI,
It looks for me very nice and will try to use it, great work!
One feature which could be a very neat is to run the workers with retry mechanism ,
e.g. as I define a timeout for a job, in case the job run and get an error run some retry (until the timeout reached)
WDYT?
Regards,
David
Add test that checks timeout occurs before retries are up (if a job times out I'm one second and is set to retry 5 times, but take .5 second each try, we don't want a response from the last retry).
HI,
I've some time to run some extended test (on the master branch code ) and I found one issue.:)
in case you provide a timeout
and retry
to specific task you are getting the error that the time passed which is ok. However, the task is proceed and continue to run which is problematic as the timeout should win.
User should define a timeout as limit to the task and after the job exceed the timeout it should be killed to free resources as if it pass the results after the timeout the results is not valid anymore...
as the jobResults := wp.StopWaitXT()
should handle it.
WDYT?
I know there are default options and per job options, but I want per worker options, is that possible?
The reason is that I want to have an open connection to my RabbitMQ on a per workers basis, with the max set on workerpoolxt.New()
, keep reusing that connection and not open a new one on every job.
Hi @oze4 , Hope you doing good!
I've tried the latest version and I saw that you made some changes to the timeout and add the contexts,
defaultCtx := context.Background()
numWorkers := 10
wp := wpxt.New(defaultCtx, numWorkers)
timeout := time.Duration(time.Millisecond)
timeout2 := time.Duration(time.Seconds)
myCtx, done := context.WithTimeout(context.Background(), timeout)
defer done()
wp.SubmitXT(wpxt.Job{
Name: "Job1",
Context: myCtx,
Task: func(o wpxt.Options) wpxt.Response {
time.Sleep(time.Second*10)
return wpxt.Response{Data: "I could be anything"}
},
})
myCtx2, done2:= context.WithTimeout(context.Background(), timeout2)
defer done2()
wp.SubmitXT(wpxt.Job{
Name: "Job2",
Context: myCtx2,
Task: func(o wpxt.Options) wpxt.Response {
return wpxt.Response{Data: "I could be anything"}
},
})
3.Btw, when you use the defer done()
the job doesnt wait to the timeout, it finish immeditally, should I use it like this? or I can remove the defer
and use it like myCtx, _ := context.WithTimeout(context.Background(), timeout)
?
Thanks a lot!
Hey,
We want to use the lib in production, and I've several of questions .
I use this code from your test with a bit modification to fit our use-case.
when running the code we got the following output:
Job 2 has failed with error : context deadline exceeded
Job 1 has failed with error : context deadline exceeded
Job 3 will encounter an error has failed with error : context deadline exceeded
Questions:
I dont understand why job2 is getting "context deadline exceeded" and the context time defined 5 sec and the timeout inside the job is only 2
why job1
is failing with the same error? I didnt provide it timeout ...
btw, I saw now that you have retry option, if I put some number like 5 , how it works and if I can control it(the delta retry time)
version: 1.1.1
simply copy the code run it and use localhost:3000/test
to get the results
package main
import (
`context`
`fmt`
`net/http`
`time`
wpxt "github.com/oze4/workerpoolxt"
)
func main() {
ctx, c := context.WithTimeout(context.Background(), time.Second *5)
defer c()
http.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) {
wp := wpxt.New(ctx, 10)
// Uses default timeout
wp.SubmitXT(wpxt.Job{
Name: "Job 1",
Task: func(o wpxt.Options) wpxt.Response {
return wpxt.Response{Data: "yay"}
},
})
wp.SubmitXT(wpxt.Job{
Name: "Job 2",
Task: func(o wpxt.Options) wpxt.Response {
time.Sleep(time.Second * 1)
return wpxt.Response{Data: "timedout"}
},
Context: ctx,
})
// Or if you encounter an error within the code in your job
wp.SubmitXT(wpxt.Job{
Name: "Job 3 will encounter an error",
Task: func(o wpxt.Options) wpxt.Response {
err := fmt.Errorf("ErrorPretendException : something failed")
if err != nil {
return wpxt.Response{Error: err}
}
return wpxt.Response{Data: "error"}
},
})
results := wp.StopWaitXT()
for _, r := range results {
if r.Error != nil {
fmt.Println(r.Name(), "has failed with error :", r.Error.Error())
} else {
fmt.Println(r.Name(), "has passed successfully")
}
}
})
fmt.Println("running")
e := http.ListenAndServe(":3000", nil)
if e != nil {
fmt.Println(e)
}
}
I saw that this repo is new, which is very supersizing that no one did it before(the worker-pool package is very abstract) ...nice catch ๐
Hi,
Great lib & work!
myreactor.Add(reactor.Job{
Name: "job1",
Runner: func(c *reactor.Client) reactor.React {
// do something with client `c`
res, _ := c.HTTP.Get("xyz.com")
return reactor.React{Info: res}
},
})
myreactor.Add(reactor.Job{
Name: "job2",
Runner: func(c *reactor.Client) reactor.React {
// do something with client `c`
res, _ := c.HTTP.Get("xyz.com")
return reactor.React{Info: res}
},
})
Thanks!
When mixing Submit()
& SubmitXT()
with either StopWaitXT()
or StopWait()
there is a race condition.
How to reproduce:
go test -race -run ^TestSubmitWithSubmitXT_UsingStopWait$ -count=2000 github.com/oze4/workerpoolxt
# and/or
go test -race -run ^TestSubmitWithSubmitXT_UsingStopWaitXT$ -count=2000 github.com/oze4/workerpoolxt
When creating a new WorkerPoolXT
let the caller pass in a Context
instead of specifying a timeout. The direct caller may need to use a context because:
Consider the case where a server is processing a client's request, and receives a context with the request. The server wants to use WorkerPoolXT to process the request, and needs to use the request's context to know if processing should be abandoned.
Also, this makes the user choose a timeout, or non at all - whichever is most appropriate. This means that the default timeout should also probably go away. A default timeout is just a guess at how long something should take, and WPXT has know way to know this. Often having no timeout is the correct thing, and is typically the expected default behavior.
Hey,
Can we use it on production?
if so did you run some go-profiling on bulk of workers?
Thanks
Hello!
We want to use the worker pool library and we found that this repository have some nice abstraction over it
and also the timeout feature for each task which is definitely required.
some questions.
2 . one thing that we are afraid is race condition for concurrent task , I know that the worker pool handle it and run tests to check it. does this library doing that also?
Best Regards,
Bred
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.