Git Product home page Git Product logo

tunny's Introduction

Tunny

godoc for Jeffail/tunny goreportcard for Jeffail/tunny

Tunny is a Golang library for spawning and managing a goroutine pool, allowing you to limit work coming from any number of goroutines with a synchronous API.

A fixed goroutine pool is helpful when you have work coming from an arbitrary number of asynchronous sources, but a limited capacity for parallel processing. For example, when processing jobs from HTTP requests that are CPU heavy you can create a pool with a size that matches your CPU count.

Install

go get github.com/Jeffail/tunny

Or, using dep:

dep ensure -add github.com/Jeffail/tunny

Use

For most cases your heavy work can be expressed in a simple func(), where you can use NewFunc. Let's see how this looks using our HTTP requests to CPU count example:

package main

import (
	"io/ioutil"
	"net/http"
	"runtime"

	"github.com/Jeffail/tunny"
)

func main() {
	numCPUs := runtime.NumCPU()

	pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
		var result []byte

		// TODO: Something CPU heavy with payload

		return result
	})
	defer pool.Close()

	http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
		input, err := ioutil.ReadAll(r.Body)
		if err != nil {
			http.Error(w, "Internal error", http.StatusInternalServerError)
		}
		defer r.Body.Close()

		// Funnel this work into our pool. This call is synchronous and will
		// block until the job is completed.
		result := pool.Process(input)

		w.Write(result.([]byte))
	})

	http.ListenAndServe(":8080", nil)
}

Tunny also supports timeouts. You can replace the Process call above to the following:

result, err := pool.ProcessTimed(input, time.Second*5)
if err == tunny.ErrJobTimedOut {
	http.Error(w, "Request timed out", http.StatusRequestTimeout)
}

You can also use the context from the request (or any other context) to handle timeouts and deadlines. Simply replace the Process call to the following:

result, err := pool.ProcessCtx(r.Context(), input)
if err == context.DeadlineExceeded {
	http.Error(w, "Request timed out", http.StatusRequestTimeout)
}

Changing Pool Size

The size of a Tunny pool can be changed at any time with SetSize(int):

pool.SetSize(10) // 10 goroutines
pool.SetSize(100) // 100 goroutines

This is safe to perform from any goroutine even if others are still processing.

Goroutines With State

Sometimes each goroutine within a Tunny pool will require its own managed state. In this case you should implement tunny.Worker, which includes calls for terminating, interrupting (in case a job times out and is no longer needed) and blocking the next job allocation until a condition is met.

When creating a pool using Worker types you will need to provide a constructor function for spawning your custom implementation:

pool := tunny.New(poolSize, func() Worker {
	// TODO: Any per-goroutine state allocation here.
	return newCustomWorker()
})

This allows Tunny to create and destroy Worker types cleanly when the pool size is changed.

Ordering

Backlogged jobs are not guaranteed to be processed in order. Due to the current implementation of channels and select blocks a stack of backlogged jobs will be processed as a FIFO queue. However, this behaviour is not part of the spec and should not be relied upon.

tunny's People

Contributors

darjun avatar ericyt avatar jarri-abidi avatar jeffail avatar mihaitodor avatar pkaeding avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tunny's Issues

PSA: API Changes - If your builds suddenly broke read this

The tunny API has been changed recently. If you have found that builds are suddenly broken and this repo has shown up in the error logs then it is likely due to this. You have two options:

Update to the latest API

The new API is simpler, comes with performance improvements and adds more capabilities, so it might be worth simply updating. For the majority of cases that will mean switching from:

pool, _ := tunny.CreatePool(n, func(object interface{}) interface{} {
    // Stuff
}).Open()
defer pool.Close()

foo, _ := pool.SendWork(bar)

To:

pool := tunny.NewFunc(n, func(object interface{}) interface{} {
    // Stuff
})
defer pool.Close()

foo := pool.Process(bar)

Use vendoring

The old API is tagged at version 0.0.1, therefore you can use vendoring to continue building with the old API by targeting that version.

If you are using deps then add this to your Gopkg.toml file:

[[constraint]]
name = "github.com/Jeffail/tunny"
version = "0.0.1"

when I was using this codes, some panics happened!

`package goTest

import (
"fmt"
"github.com/Jeffail/tunny"
"strconv"
"testing"
"time"
)

func Test_pools(t *testing.T) {

pool := tunny.NewFunc(11, func(payload interface{}) interface{} {
	fmt.Println(payload.(int))
	test1(payload.(int))
	time.Sleep(time.Second)
	return nil
})
defer pool.Close()
for i := 0; i < 10; i++ {
	go pool.Process(i)
}
pool.Close()

}
func test1(i int){
fmt.Println("a"+strconv.Itoa(i))
}
`

problem
`GOROOT=/usr/local/Cellar/go/1.14.6/libexec #gosetup
GOPATH=/Users/gudaixin/go #gosetup
/usr/local/Cellar/go/1.14.6/libexec/bin/go test -c -o /private/var/folders/ty/2yjqwcmn1xv6d9v7blg39x0h0000gn/T/___Test_pools_in_ProjectGoModule_src_goTest ProjectGoModule/src/goTest #gosetup
/usr/local/Cellar/go/1.14.6/libexec/bin/go tool test2json -t /private/var/folders/ty/2yjqwcmn1xv6d9v7blg39x0h0000gn/T/___Test_pools_in_ProjectGoModule_src_goTest -test.v -test.run ^Test_pools$ #gosetup
=== RUN Test_pools
6
a6
7
a7
8
a8
9
a9
1
a1
panic: the pool is not running

goroutine 34 [running]:
github.com/Jeffail/tunny.(*Pool).Process(0xc0000a4640, 0x1114780, 0xc0000a61c0, 0xa9978700000000f, 0x61da5e0f)
/Users/gudaixin/go/pkg/mod/github.com/!jeffail/[email protected]/tunny.go:158 +0x13d
created by ProjectGoModule/src/goTest.Test_pools
/Users/gudaixin/Public/PrivatePersonGoProject/ProjectGoModule/src/goTest/pool_test.go:21 +0xb1

Process finished with exit code 1
`

Use tunny for IO bound concurrency?

Hi,
I should mention that I'm new to Golang.

I'm trying to write a solution that behaves as follows:
– I have a web service built with go-json-rest framework;
– handling endpoint, let's say test, I'm going to make outbound HTTP requests (from 1 to N) and get back the results of each;
– to get those requests concurrently, I'm just using goroutines one per each HTTP request (similar basic approach here: http://blog.narenarya.in/concurrent-http-in-go.html)

This approach is good enough for basic usage, but when we talk about resources (go routines) then we should care about managing it somehow. So, what do you think, is it ok using Tunny to handle such kind of resource management or we can just use semaphores like this:

semaphor := make(chan int, concurrency)
for url := range urls {
  semaphor <- 1
  go func( url string ) {
    // do http get here
    <-semaphor //block on semaphore
  }(url)
}

?

Additionally, I'd like to notify client if there are no resources to handle request with some HTTP status.

Thanks in advance

Getting the worker id in the job function

Hello ! It would be really helpful to me to get the worker ID in the executed job, because I want to parallelize work using a pool of singletons that are stateful, so one should never process more than 1 task at a time, but I can create X of them.

I was thinking of something like this:

tunny.NewFunc(3, func(payload interface{}, workerId int) interface{} {
    // workerId can be 0, 1, or 2
})

Could it be an interesting feature? Can make a PR if you think it have a chance to be included in the project.

====

Edit: Another way would be to be able to provide to tunny our own workers, implementing the tunny worker interface (this way we can expose the required element in the function) - I think it may be a cleaner way to implement it (@naggingant)

What do you suggest ?

Panic behaviour notes

May I know how tunny behaves if panic happens inside the NewFunc. Does tunny re-spawn the worker or what is the ideal way to handle panic in a NewFunc worker?

Add support for context

Like ProcessTimed(i interface{}, t time.Duration), it would be nice to have a ProcessCtx(ctx context.Context, i interface{}) func so that jobs could be cancelled in case the ctx gets cancelled.

Goroutines and SendWork

Is it advisable to SendWork to a pool from within a goroutine? For example, I have a Go program that creates image thumbnails. Since image resizing in pure Go is not the fastest, I am using this library to resize multiple images at a time.

Here's an oversimplified version of how I'm using this library:

scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
    pathToImage := scanner.Bytes()

    go func() {
        pool.SendWork(pathToImage)
    }()
}

Is this the wrong approach?

how to stop worker when timeout detected?

Is there a sample implementation of TunnyInterrupt()? It seems that when timeout, TunnyInterrupt() will be called, but the worker is still running, how to stop it?

Unallocated

What does this mean exactly?

Jobs are unallocated to a worker until one becomes available.

Should it be instead jobs are allocated to a worker when one becomes available?

Recommended Settings for IO bound jobs?

Hi there all! I was wondering if anybody have any recommended settings(as in GOMAXPROCS and number of workers) for IO bound jobs that I can start with? If you're wondering what IO bound jobs I'm doing, I'm accessing a remote mysql database. Would like some suggestions if you have any. Thanks!

Change example to set GOMAXPROCS to numCPUs+1

You should really have an OS thread spare to handle non-busy work, like handling the HTTP requests.

Otherwise you are relying on Go's 'preemption' to give the HTTP handler a chance, which may or may not happen in a timely manner depending on the nature of your "heavy work".

get error when run a period of time: goroutine 115 [select, 3 minutes], goroutine 116 [IO wait] etc.

my code:

	pool := tunny.NewFunc(runtime.NumCPU(), func(i interface{}) interface{} {
		updateDB(i.(string))
		return nil
	})
	for scanner.Scan() {
		line := scanner.Text()
		go pool.Process(line)
	}
}

func updateDB(line string) {
        // updateDB logic
}

get error when run a period of time:

goroutine 115 [select, 3 minutes]:
github.com/ethereum/go-ethereum/rpc.(*Client).dispatch(0xc0005a0180, 0xe1d4b8, 0xc000506270)
        /home/amber/go/pkg/mod/github.com/ethereum/[email protected]/rpc/client.go:561 +0x27d
created by github.com/ethereum/go-ethereum/rpc.initClient
        /home/amber/go/pkg/mod/github.com/ethereum/[email protected]/rpc/client.go:223 +0x2c5

goroutine 116 [IO wait]:
internal/poll.runtime_pollWait(0x7fbf244dc6b0, 0x72, 0xffffffffffffffff)
        /usr/local/go/src/runtime/netpoll.go:222 +0x55
internal/poll.(*pollDesc).wait(0xc0005a0118, 0x72, 0x400, 0x400, 0xffffffffffffffff)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:87 +0x45
internal/poll.(*pollDesc).waitRead(...)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:92
internal/poll.(*FD).Read(0xc0005a0100, 0xc000346000, 0x400, 0x400, 0x0, 0x0, 0x0)
        /usr/local/go/src/internal/poll/fd_unix.go:166 +0x1d5
net.(*netFD).Read(0xc0005a0100, 0xc000346000, 0x400, 0x400, 0x8b, 0x100010000, 0xc000264230)
        /usr/local/go/src/net/fd_posix.go:55 +0x4f
net.(*conn).Read(0xc000266010, 0xc000346000, 0x400, 0x400, 0x0, 0x0, 0x0)
        /usr/local/go/src/net/net.go:183 +0x91
bufio.(*Reader).fill(0xc0001c0b40)
        /usr/local/go/src/bufio/bufio.go:101 +0x108
bufio.(*Reader).Peek(0xc0001c0b40, 0x2, 0x0, 0x0, 0x20, 0x0, 0x0)
        /usr/local/go/src/bufio/bufio.go:139 +0x4f
github.com/gorilla/websocket.(*Conn).read(0xc000194160, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0)
        /home/amber/go/pkg/mod/github.com/gorilla/[email protected]/conn.go:370 +0x46
github.com/gorilla/websocket.(*Conn).advanceFrame(0xc000194160, 0x9, 0x0, 0x0)
        /home/amber/go/pkg/mod/github.com/gorilla/[email protected]/conn.go:798 +0x5c
github.com/gorilla/websocket.(*Conn).NextReader(0xc000194160, 0xc000102480, 0x7fbef01c8da8, 0x8, 0x18, 0x7fbf4d7cc108)
        /home/amber/go/pkg/mod/github.com/gorilla/[email protected]/conn.go:980 +0x8f
github.com/gorilla/websocket.(*Conn).ReadJSON(0xc000194160, 0xc0bb20, 0xc003f37728, 0x1, 0xc003f37728)
        /home/amber/go/pkg/mod/github.com/gorilla/[email protected]/json.go:50 +0x2f
github.com/ethereum/go-ethereum/rpc.(*jsonCodec).readBatch(0xc0000c80a0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
        /home/amber/go/pkg/mod/github.com/ethereum/[email protected]/rpc/json.go:209 +0x57
github.com/ethereum/go-ethereum/rpc.(*Client).read(0xc0005a0180, 0xe1d4b8, 0xc000506270)
        /home/amber/go/pkg/mod/github.com/ethereum/[email protected]/rpc/client.go:634 +0xca
created by github.com/ethereum/go-ethereum/rpc.(*Client).dispatch
        /home/amber/go/pkg/mod/github.com/ethereum/[email protected]/rpc/client.go:558 +0x10f
goroutine 163 [select, 3 minutes]:
github.com/Jeffail/tunny.(*workerWrapper).run(0xc0000d5d40)
        /home/amber/go/pkg/mod/github.com/!jeffail/[email protected]/worker.go:93 +0x1d4
created by github.com/Jeffail/tunny.newWorkerWrapper
        /home/amber/go/pkg/mod/github.com/!jeffail/[email protected]/worker.go:70 +0x130
goroutine 146519 [select, 3 minutes]:
github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1(0xc012709e60, 0xc00036a5a0, 0xc0001d2300)
        /home/amber/go/pkg/mod/github.com/go-sql-driver/[email protected]/connection.go:621 +0xa5
created by github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher
        /home/amber/go/pkg/mod/github.com/go-sql-driver/[email protected]/connection.go:618 +0xbe

How to use `NewCallback`

Can you please explain how to use NewCallback?

I don't quite understand the documentation:

NewCallback creates a new Pool of workers where workers cast the job payload into a func() and runs it, or returns ErrNotFunc if the cast failed.

Process a pool asynchronously

After big API changes how we can process a pool asynchronously?

Also what about callback functions?

TIP for author: When you change the lib API you have to respect your users and

  1. don't make brake changes if not necessary
  2. keep the same feature as before unless you did some terrible wrong before.
  3. if you did the 1 and 2 it's ok but the first thing that you have to do is to create a migration guide!

Please, answer the 2 question above!

doesn't work with 32 bit Windows

It's all ok for 64 bit, but if I set GOARCH=386, there is a panic. My test code:

package main

import (
	"github.com/Jeffail/tunny"
)

var pool *tunny.Pool

func main() {

	pool = tunny.NewFunc(10, func(data interface{}) interface{} {
		str := data.(string)
		println("str:", str)

		return nil
	})
	defer pool.Close()

	pool.Process("11111111")

	for {
	}

}

And the panic output:

$ go run .
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xc0000005 code=0x0 addr=0x0 pc=0x401bdc]

goroutine 1 [running]:
runtime/internal/atomic.Xadd64(0x1305801c, 0x1, 0x0, 0x13058014, 0x44c7d4)
D:/Go/src/runtime/internal/atomic/asm_386.s:102 +0xc
github.com/Jeffail/tunny.(*Pool).Process(0x13058000, 0x459ae0, 0x473168, 0x13046090, 0x13044030)
d:/gopath/src/github.com/Jeffail/tunny/tunny.go:152 +0x3e
main.main()
e:/go/3/3.go:19 +0x7f
exit status 2

Panic on pool.SetSize(5) via http request

var pool tunny.Pool

...

	numCPUs := runtime.NumCPU()

	pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
		time.Sleep(time.Second)
		log.Println(fmt.Sprintf("superprocess %v", payload))

		return nil
	})
	pool.SetSize(1)
	defer pool.Close()

...

	http.HandleFunc("/tune", endpointTune)

...


	go func() {

		for {
			for Deque.Len() != 0 {
				go pool.Process(Deque.PopFront())
			}
			time.Sleep(time.Second)
		}
	}()
package main

import (
	"net/http"
)


type TuneRequest struct {
	PoolSize int
}


func endpointTune(res http.ResponseWriter, req *http.Request) {
	r := TuneRequest{}

	err := decoder.Decode(&r, req.URL.Query())

	if err != nil {
		return
	}

	pool.SetSize(r.PoolSize)
}
2020/04/07 11:45:23 http: panic serving [::1]:57105: runtime error: invalid memory address or nil pointer dereference
goroutine 27 [running]:
net/http.(*conn).serve.func1(0xc0000a2b40)
	/Go/src/net/http/server.go:1772 +0x150
panic(0x840e40, 0xb3e540)
	/Go/src/runtime/panic.go:973 +0x429
github.com/Jeffail/tunny.(*Pool).SetSize(0xb4b460, 0x5)
	/Go/bin/src/github.com/Jeffail/tunny/tunny.go:236 +0x108
main.endpointTune(0x90ef20, 0xc00010c000, 0xc000132100)
	/src/endpointTune.go:24 +0xf4
net/http.HandlerFunc.ServeHTTP(0x8bc580, 0x90ef20, 0xc00010c000, 0xc000132100)
	/Go/src/net/http/server.go:2012 +0x4b
net/http.(*ServeMux).ServeHTTP(0xb4b4e0, 0x90ef20, 0xc00010c000, 0xc000132100)
	/Go/src/net/http/server.go:2387 +0x1ad
net/http.serverHandler.ServeHTTP(0xc000180000, 0x90ef20, 0xc00010c000, 0xc000132100)
	/Go/src/net/http/server.go:2807 +0x216
net/http.(*conn).serve(0xc0000a2b40, 0x90f520, 0xc0000e1780)
	/Go/src/net/http/server.go:1895 +0x171d
created by net/http.(*Server).Serve
	/Go/src/net/http/server.go:2933 +0x938

Handling job cancelation / pool.close

Hi,

I'm still not sure what will happen to the all job when workpool.Close() is invoked and the jobs will react to this. Wondering if there is a way for job to handle cancellation individually from workpool.Close() ?

Or is it possible to do context like cancelation handling inside the job?

ps : tunny is cool

Doubt

Hi I'm a newbie in Go, so I don't know what is the difference between using Tunny in a synchronous way or an asynchronous way?

I don't understand 1 moment in new API

example old

for {
  _, err_time := pool.SendWorkTimed(time.Millisecond, func() {
    slog.Log(fmt.Sprintf("%#v", "wait connection"), nil)   
    ln.Accept()
  })

  if err_time != nil {
    slog.Log(fmt.Sprintf("%#v", "timeout"), nil)
    time.Sleep(time.Millisecond)
  }
}

example new

for {
  _, err_time := pool.ProcessTimed(func() {
    slog.Log(fmt.Sprintf("%#v", "wait connection"), nil)   
    ln.Accept()
  }, time.Millisecond)

  if err_time == tunny.ErrJobTimedOut {
    slog.Log(fmt.Sprintf("%#v", "timeout"), nil)
    time.Sleep(time.Millisecond)
  }
}

In old version api the function WorkTimed will wait accept connection
In new version api the function ProcessTimed will throw always timeout error

Where right working?
I used to think what workTimed for wait free goroutine from pool
But new api said for me what procesTimed about timeout for function execute time?

Sorry for my bad english.. Thanks

how to safely submit work as pool might be closed earlier.

Firstly, thanks for the great work to provide easy code to manage the pooling work.

Background
I am using the tunny with gin framework to do some heavy work triggered from user request, and too many heavy works might lead to cpu peak or memory issue in short time, I limit the tunny work count.

Concern
I hooked my tunny cleanup work with gin server shutdown, so if server is signaled to close, I get chance to close the pool. But I am afraid that if there is new job submitted after pool shutdown, the panic will happen. How to safely manage this?

Thanks.

Expose len(pool.reqChan)

Its would be useful to be able to see the number of jobs waiting to be processed. Would you have anything against a PR exposing it as QueueLength() ?

tunny performance

i want to use tunny as my process pool && i test it with this code:

package main

import (
    "github.com/Jeffail/tunny"
    "github.com/pkg/profile"
)

type Worker struct {
}

func (this *Worker) TunnyReady() bool {
    return true
}

func (this *Worker) TunnyJob(interface{}) interface{} {
    return nil 
}

func main() {
    defer profile.Start(profile.BlockProfile).Stop()

    workers := make([]tunny.TunnyWorker, 100, 100)
    for i, _ := range workers {
        workers[i] = &Worker{}
    }   

    pool, _ := tunny.CreateCustomPool(workers).Open()
    defer pool.Close()

    //wg := new(sync.WaitGroup)
    //wg.add(10)

    //for i := 0; i < 10; i++ {
    //  go func() {
    //      for i:=0 i
    //      defer wg.Done()
    //  }()
    //} 
    //wg.Wait()

    for i := 0; i < 10000000; i++ {
        pool.SendWork(i)
    }   
}

i find that all wokers are blocked here. I think that readyChan can have one element at most, why it's blocked here?

ROUTINE ======================== github.com/Jeffail/tunny.(*workerWrapper).Loop in /home/eric/Code/Go/GOPATH/src/github.com/Jeffail/tunny/worker.go
         0    7.67hrs (flat, cum) 99.92% of Total
         .          .     46:           break
         .          .     47:       }
         .          .     48:       time.Sleep(tout * time.Millisecond)
         .          .     49:   }
         .          .     50:
         .   283.72ms     51:   wrapper.readyChan <- 1
         .          .     52:
         .   161.92ms     53:   for data := range wrapper.jobChan {
         .     23.28s     54:       wrapper.outputChan <- wrapper.worker.TunnyJob(data)
         .          .     55:       for !wrapper.worker.TunnyReady() {
         .          .     56:           if atomic.LoadUint32(&wrapper.poolOpen) == 0 {
         .          .     57:               break
         .          .     58:           }
         .          .     59:           time.Sleep(tout * time.Millisecond)
         .          .     60:       }
         .    7.67hrs     61:       wrapper.readyChan <- 1
         .          .     62:   }
         .          .     63:
         .          .     64:   close(wrapper.readyChan)
         .          .     65:   close(wrapper.outputChan)

How to close the goruntime in the pool

`numCPUs := runtime.NumCPU()

pool := tunny.NewFunc(numCPUs, func(in interface{}) interface{} {
	intVal := in.(int)
	fmt.Println("first", in)
	time.Sleep(3 * time.Second)
	fmt.Println("second", in)
	return intVal * 2
})
defer pool.Close()

ctx, cancel := context.WithCancel(context.Background())

pool.ProcessCtx(ctx, 10)
time.Sleep(2 * time.Second)
cancel()`

first 10
second 10

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.