Git Product home page Git Product logo

workerpoolxt's Introduction

workerpoolxt GitHub

GitHub Workflow Status Coveralls github
Codacy grade

Worker pool library that extends https://github.com/gammazero/workerpool

Note

If you are using context, please use with caution in production!! See here for more info

  • Go Playground which shows the area of concern

  • Wonderful explanation from the author of workerpool:

Once a goroutine is started, there no way to kill it unless there is something inside that goroutine that is looking for a signal, waiting on a context or channel, etc. In the case of workerpool, and workerpoolxt, the pool is providing the goroutine and calling someone else's function in that goroutine. There is no telling if that function will ever return and allow your goroutine to finish or run another task. The only thing that workerpoolxt can do is to run the task function in another goroutine and wait for the task to finish or for a timer to expire or some other signal to give up waiting (such as context being canceled). When this signal to give up happens, the worker goroutine can return and report an error, but the task goroutine is still running somewhere in the background and may or may not ever finish. All you have done is given up waiting for it to do so.

- @gammazero github.com/gammazero

Synopsis


Hello World

  • Obligatory "as simple as it gets" example
package main

import (
    "context"
    "fmt"
    wpxt "github.com/oze4/workerpoolxt"
)

func main() {
    ctx := context.Background()
    numWorkers := 10

    wp := wpxt.New(ctx, numWorkers)

    wp.SubmitXT(wpxt.Job{
        Name: "My first job",
        Task: func(o wpxt.Options) wpxt.Result {
            return wpxt.Result{Data: "Hello, world!"}
        },
    })

    jobResults := wp.StopWaitXT()

    for _, jobresult := range jobResults {
        fmt.Println(jobresult)
    }
}

How we extend workerpool

Results

// ...
// ... pretend we submitted jobs here
// ...

results := wp.StopWaitXT() // -> []wpxt.Result

for _, result := range results {
    // If job failed, `result.Error != nil`
}

Error Handling

  • What if I encounter an error in one of my jobs?
  • How can I handle or check for errors/timeout?

Return Error From Job

// Just set the `Error` field on the `wpxt.Result` you return
wp.SubmitXT(wpxt.Job{
    Name: "How to handle errors",
    Task: func(o wpxt.Options) wpxt.Result {
        // Pretend we got an error doing something
        if theError != nil {
            return wpxt.Result{Error: theError}
        }
    },
})

Check For Errors In Result

// ... pretend we submitted a bunch of jobs
//
// StopWaitXT() returns []wpxt.Result
// Each result has an `Error` field
// Whether a timeout, or an error you set
// Check for it like
if someResultFromSomeJob.Error != nil {
    // ....
}

Context

  • Required default context when creating new workerpoolxt
  • You can override default context per job

Default Context

myctx := context.Background() // Any `context.Context`
numWorkers := 10
wp := wpxt.New(myctx, numWorkers)

Per Job Context

Timeouts

defaultCtx := context.Background()
numWorkers := 10
wp := wpxt.New(defaultCtx, numWorkers)
timeout := time.Duration(time.Millisecond)

myCtx, done := context.WithTimeout(context.Background(), timeout)
defer done()

wp.SubmitXT(wpxt.Job{
    Name: "my ctx job",
    Context: myCtx,
    Task: func(o wpxt.Options) wpxt.Result {
        // Simulate long running task
        time.Sleep(time.Second*10) 
        return wpxt.Result{Data: "I could be anything"}
    },
})
// > `Result.Error` will be `context.DeadlineExceeded`

Retry

  • Optional
  • Seamlessly retry failed jobs
wp.SubmitXT(wpxt.Job{
    // This job is configured to fail immediately, 
    // therefore it will retry 5 times
    // (as long as we have not exceeded our job timeout)
    timeoutctx, _ := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500))
    Retry: 5,
    // ^^^^^^
    Name: "I will retry 5 times",
    // Set timeout field on job
    Context: timeoutctx,
    Task: func(o wpxt.Options) wpxt.Result {
        return wpxt.Result{Error: errors.New("some_err")}
    },
})

Options

  • Help make jobs flexible

Default Options

myopts := map[string]interface{}{
    "myclient": &http.Client{},
}

wp := wpxt.New(context.Background(), 10)
wp.WithOptions(myopts)

wp.SubmitXT(wpxt.Job{
    Name: "myjob",
    Task: func(o wpxt.Options) wpxt.Result {
        // access options here
        client := o["myclient"]
    },
})

Per Job Options

myhttpclient := &http.Client{}
myk8sclient := kubernetes.Clientset{}

// This Job Only Needs an HTTP Client
wp.SubmitXT(wpxt.Job{
    Name: "This Job Only Needs an HTTP Client",
    Options: map[string]interface{}{
        "http": myhttpclient,
    },
    Task: func(o wpxt.Options) wpxt.Result {
        // access options here
        httpclient := o["http"]
        // ... do work with `httpclient`
    },
})

// This Job Only Needs Kubernetes Clientset
wp.SubmitXT(wpxt.Job{
    Name: "This Job Only Needs Kubernetes Clientset",
    Options: map[string]interface{}{
        "kube": myk8sclient,
    },
    Task: func(o wpxt.Options) wpxt.Result {
        // access options here
        kubernetesclient := o["kube"]
        // ... do work with `kubernetesclient`
    },
})

workerpoolxt's People

Contributors

oze4 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

Forkers

codacy-badger

workerpoolxt's Issues

Context instead of timeout

When creating a new WorkerPoolXT let the caller pass in a Context instead of specifying a timeout. The direct caller may need to use a context because:

  • They may need it to timeout or cancel it depending on circumstances
  • They may have been given a context that times out or gets canceled, which they must use directly or create a child context from.

Consider the case where a server is processing a client's request, and receives a context with the request. The server wants to use WorkerPoolXT to process the request, and needs to use the request's context to know if processing should be abandoned.

Also, this makes the user choose a timeout, or non at all - whichever is most appropriate. This means that the default timeout should also probably go away. A default timeout is just a guess at how long something should take, and WPXT has know way to know this. Often having no timeout is the correct thing, and is typically the expected default behavior.

Found race condition

When mixing Submit() & SubmitXT() with either StopWaitXT() or StopWait() there is a race condition.

How to reproduce:

go test -race -run ^TestSubmitWithSubmitXT_UsingStopWait$ -count=2000 github.com/oze4/workerpoolxt
# and/or
go test -race -run ^TestSubmitWithSubmitXT_UsingStopWaitXT$ -count=2000 github.com/oze4/workerpoolxt

Suggestion for retry

HI,

It looks for me very nice and will try to use it, great work!
One feature which could be a very neat is to run the workers with retry mechanism ,
e.g. as I define a timeout for a job, in case the job run and get an error run some retry (until the timeout reached)
WDYT?

Regards,
David

Options per workers

I know there are default options and per job options, but I want per worker options, is that possible?

The reason is that I want to have an open connection to my RabbitMQ on a per workers basis, with the max set on workerpoolxt.New(), keep reusing that connection and not open a new one on every job.

Extend with specific client

Hi,

As discuses I open a new question with example of my code.

I need to extend the current implementation with my clients, Im doing that like following:
this works, but am I doing it right ? is there a better/cleaner approach to add those clients / interface in go... ?
to gain some confidence, i'll appreciate your example also.

Thanks in advance!


// Inject worker client
type Client struct {
	HTTP       http.Client
	Kubernetes kubernetes.Clientset
}

type WorkerClients struct {
	client *Client
	wp     *workerpoolxt.WorkerPoolXT
}

func main() {

	wp := workerpoolxt.New(10, time.Duration(time.Second*10))

	aa := WorkerClients{
		client: &Client{
			HTTP:       HTTPClient(),
			Kubernetes: AnotherClient(),
		},
		wp: wp,
	}

	aa.wp.SubmitXT(
		workerpoolxt.Job{ // For demo purposes, this job will timeout
			Name:    "Job1",
			Timeout: time.Second * 5,
			Task: func() workerpoolxt.Response {
				res, e := aa.client.HTTP.Get("http://www.google.com")
				if e != nil {
					return workerpoolxt.Response{Error: e}
				}
				return workerpoolxt.Response{Data: res.StatusCode}
			},
		})

	wp.SubmitXT(
		workerpoolxt.Job{ // For demo purposes, this job will timeout
			Name:    "Job2",
			Timeout: time.Second * 1,
			Task: func() workerpoolxt.Response {
				time.Sleep(time.Second * 2)
				return workerpoolxt.Response{Data: "Hello"}
			},
		})

	// Submit as many jobs as you would like

	results := wp.StopWaitXT()

using context with timeouts

Hey,

We want to use the lib in production, and I've several of questions .

I use this code from your test with a bit modification to fit our use-case.

when running the code we got the following output:

Job 2 has failed with error : context deadline exceeded
Job 1 has failed with error : context deadline exceeded
Job 3 will encounter an error has failed with error : context deadline exceeded

Questions:

  1. I dont understand why job2 is getting "context deadline exceeded" and the context time defined 5 sec and the timeout inside the job is only 2

  2. why job1 is failing with the same error? I didnt provide it timeout ...

  3. btw, I saw now that you have retry option, if I put some number like 5 , how it works and if I can control it(the delta retry time)
    version: 1.1.1

simply copy the code run it and use localhost:3000/test to get the results

package main

import (
	`context`
	`fmt`
	`net/http`
	`time`

	wpxt "github.com/oze4/workerpoolxt"
)

func main() {

	ctx, c := context.WithTimeout(context.Background(), time.Second *5)
    defer c()


	http.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) {
		wp := wpxt.New(ctx, 10)

		// Uses default timeout
		wp.SubmitXT(wpxt.Job{
			Name: "Job 1",
			Task: func(o wpxt.Options) wpxt.Response {
				return wpxt.Response{Data: "yay"}
			},
		})
		wp.SubmitXT(wpxt.Job{
			Name:    "Job 2",
			Task: func(o wpxt.Options) wpxt.Response {
				time.Sleep(time.Second * 1)
				return wpxt.Response{Data: "timedout"}
			},
			Context: ctx,
		})

		// Or if you encounter an error within the code in your job
		wp.SubmitXT(wpxt.Job{
			Name: "Job 3 will encounter an error",
			Task: func(o wpxt.Options) wpxt.Response {
				err := fmt.Errorf("ErrorPretendException : something failed")
				if err != nil {
					return wpxt.Response{Error: err}
				}
				return wpxt.Response{Data: "error"}
			},
		})

		results := wp.StopWaitXT()

		for _, r := range results {
			if r.Error != nil {
				fmt.Println(r.Name(), "has failed with error :", r.Error.Error())
			} else {
				fmt.Println(r.Name(), "has passed successfully")
			}
		}
	})


	fmt.Println("running")
	e := http.ListenAndServe(":3000", nil)
	if e != nil {
		fmt.Println(e)
	}
}

I saw that this repo is new, which is very supersizing that no one did it before(the worker-pool package is very abstract) ...nice catch ๐Ÿ‘

Use multiple task & race conditions

Hello!

We want to use the worker pool library and we found that this repository have some nice abstraction over it
and also the timeout feature for each task which is definitely required.

some questions.

  1. in case of a timeout is there any event/way to know that task X exceed the timeout?
    example, some message like" "task 1 terminated as it exceed defined timeout"

2 . one thing that we are afraid is race condition for concurrent task , I know that the worker pool handle it and run tests to check it. does this library doing that also?

Best Regards,
Bred

Add test

Add test that checks timeout occurs before retries are up (if a job times out I'm one second and is set to retry 5 times, but take .5 second each try, we don't want a response from the last retry).

Jobs still run after context is done

HI,

I've some time to run some extended test (on the master branch code ) and I found one issue.:)

in case you provide a timeout and retry to specific task you are getting the error that the time passed which is ok. However, the task is proceed and continue to run which is problematic as the timeout should win.
User should define a timeout as limit to the task and after the job exceed the timeout it should be killed to free resources as if it pass the results after the timeout the results is not valid anymore...
as the jobResults := wp.StopWaitXT() should handle it.
WDYT?

Unit test for the components

Hi,

Im searching for something similar to this lib, and I saw that it fairly new.
I'll plan to test it in the coming days, however I notice that there is missing Unit test of some tests. do you plan to add?

Thanks,
David

Using timeouts

Hi @oze4 , Hope you doing good!

I've tried the latest version and I saw that you made some changes to the timeout and add the contexts,

  1. Could you please explain what is the use-case?
  2. e.g. Two jobs with two different timeouts, am I do it right ? if so, dont you think it kind of boilerplate ? is there a way to make it shorter?
defaultCtx := context.Background()
numWorkers := 10
wp := wpxt.New(defaultCtx, numWorkers)
timeout := time.Duration(time.Millisecond)
timeout2 := time.Duration(time.Seconds)

myCtx, done := context.WithTimeout(context.Background(), timeout)
defer done()

wp.SubmitXT(wpxt.Job{
    Name: "Job1",
    Context: myCtx,
    Task: func(o wpxt.Options) wpxt.Response {

        time.Sleep(time.Second*10) 
        return wpxt.Response{Data: "I could be anything"}
    },
})


myCtx2, done2:= context.WithTimeout(context.Background(), timeout2)
defer done2()

wp.SubmitXT(wpxt.Job{
    Name: "Job2",
    Context: myCtx2,
    Task: func(o wpxt.Options) wpxt.Response {
        return wpxt.Response{Data: "I could be anything"}
    },
})

3.Btw, when you use the defer done() the job doesnt wait to the timeout, it finish immeditally, should I use it like this? or I can remove the defer and use it like myCtx, _ := context.WithTimeout(context.Background(), timeout) ?

Thanks a lot!

Few questions regard the implementation :)

Hi,

Great lib & work!

  1. To use multiple jobs, should I do it like following? it will run in concurrent mode when doing like this ? is there a shorter way to write it (like an array )?
	myreactor.Add(reactor.Job{
		Name: "job1",
		Runner: func(c *reactor.Client) reactor.React {
			// do something with client `c`
			res, _ := c.HTTP.Get("xyz.com")
			return reactor.React{Info: res}
		},
	})

	myreactor.Add(reactor.Job{
		Name: "job2",
		Runner: func(c *reactor.Client) reactor.React {
			// do something with client `c`
			res, _ := c.HTTP.Get("xyz.com")
			return reactor.React{Info: res}
		},
	})

Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.