alitto / pond Goto Github PK
View Code? Open in Web Editor NEW🔘 Minimalistic and High-performance goroutine worker pool written in Go
License: MIT License
🔘 Minimalistic and High-performance goroutine worker pool written in Go
License: MIT License
Dumb question: can tasks be submitted concurrently to a pool or do I need to use a mutex to control access to it?
I'm adding tasks concurrently. I'm also debugging by printing out the total tasks as well as the waiting tasks (which I call load) and the numbers are all over the place.
2021/07/06 15:34:46 Events.go:632: Event processing total: 119 load: 0
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 0
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 0
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 0
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 0
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 18446744073709551615
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 18446744073709551615
When analyzing the code implementation, it is found that adding tasks and removing tasks are multiple goroutines accessing the same channel.
Some questions:
1.Will there be an upper limit on the number of goroutines(the number of workers)?
2.Will there be a significant impact on performance when the number of goroutines is greater?
Look forward to your reply.
Hey!
Was looking for a go lib to handle worker pools (after attempting to make my own with limited success), found this lib and it looked very promising. Seemed to run fine locally (running on Mac OS), but when I built it for our production servers (Ubuntu 18), I encountered this Unaligned 64-bit atomic operation
error whenever we try to submit a task to the pool.
Have you encountered this before/am I just doing something wrong? Here is the code on my side:
Pool setup
maxWorkers := Conf.MaxFetcherWorkers
workQueueBufferSize := Conf.FetcherWorkQueueBufferSize
panicHandler := func(err interface{}) {
l.Logf("Fetcher worker exits from a panic: %v\nStack trace: %s", err, string(debug.Stack()))
}
workerPool := pond.New(maxWorkers, workQueueBufferSize, pond.PanicHandler(panicHandler))
l.Logf("Creating worker pool with max size=%v and workQueueBufferSize=%v", maxWorkers, workQueueBufferSize)
and then submitting:
func submitTaskToWorkerPool(workerPool *pond.WorkerPool, refreshRequest RefreshDispatch, l Logger) {
workerPool.Submit(func() {
processRefreshDispatch(refreshRequest, l)
})
}
Getting error = FetcherProcessQueueResult recover from panic=unaligned 64-bit atomic operation
Build the server like so:
GOOS=linux GOARCH=386 go build ./...
and Go version:
go version go1.18 darwin/amd64
Here's the stack trace:
Sep 05 18:25:26 : runtime/debug.Stack()
Sep 05 18:25:26 : /usr/local/go/src/runtime/debug/stack.go:24 +0x83
Sep 05 18:25:26 : runtime/debug.PrintStack()
Sep 05 18:25:26 : /usr/local/go/src/runtime/debug/stack.go:16 +0x1a
Sep 05 18:25:26 : github.com/dave-filion/test-app/lib.FetcherProcessQueueResult.func1()
Sep 05 18:25:26 : /Users/dfilion/go/src/github.com/dave-filion/test-app/lib/fetcher.go:130 +0x94
Sep 05 18:25:26 : panic({0x94f2e20, 0x9d0ab08})
Sep 05 18:25:26 : /usr/local/go/src/runtime/panic.go:838 +0x1c3
Sep 05 18:25:26 : runtime/internal/atomic.panicUnaligned()
Sep 05 18:25:26 : /usr/local/go/src/runtime/internal/atomic/unaligned.go:8 +0x2d
Sep 05 18:25:26 : runtime/internal/atomic.Xadd64(0xc70c1ec, 0x1)
Sep 05 18:25:26 : /usr/local/go/src/runtime/internal/atomic/atomic_386.s:125 +0x11
Sep 05 18:25:26 : github.com/alitto/pond.(*WorkerPool).submit(0xc70c1b0, 0xc4c9770, 0x1)
Sep 05 18:25:26 : /Users/dfilion/go/pkg/mod/github.com/alitto/[email protected]/pond.go:245 +0x83
Sep 05 18:25:26 : github.com/alitto/pond.(*WorkerPool).Submit(...)
Sep 05 18:25:26 : /Users/dfilion/go/pkg/mod/github.com/alitto/[email protected]/pond.go:221
Sep 05 18:25:26 : github.com/dave-filion/test-app/lib.submitTaskToWorkerPool(0xc70c1b0, {{0xc71abc0, 0xc}, {0xc91bd88, 0x11}, 0x63163f16, {0xc71abe0, 0xf}, 0x0}, {{0x0, ..
Sep 05 18:25:26 : /Users/dfilion/go/src/github.com/dave-filion/test-app/lib/fetcher.go:193 +0xc9
Sep 05 18:25:26 : github.com/dave-filion/test-app/lib.FetcherProcessQueueResult({0xc4daf00, 0x7c}, 0xc815490, 0xc70c1b0, {{0x0, 0x0}, {0x0, 0x0}, {0xc8ce580, 0x1b}})
Sep 05 18:25:26 : /Users/dfilion/go/src/github.com/dave-filion/test-app/lib/fetcher.go:163 +0x6f4
Sep 05 18:25:26 : github.com/dave-filion/test-app/lib.MainFetcherGoRoutine(0xc82d5d0, 0xc70c1b0, 0xc815490, {{0x0, 0x0}, {0x0, 0x0}, {0xc8ce580, 0x1b}})
Any idea what could be happening here? let me know if you need any other details. Thank you!
//test demo
func TestTaskGroup(t *testing.T) {
// Create a pool
pool := pond.New(10, 1000)
defer pool.StopAndWait()
go func() {
for {
println(fmt.Sprintf("mertics: running=%v, ide=%v, waiting=%v", pool.RunningWorkers(), pool.IdleWorkers(), pool.WaitingTasks()))
time.Sleep(1 * time.Second)
}
}()
// Create a task group
group := pool.Group()
// Submit a group of tasks
for i := 0; i < 20; i++ {
//n := i
group.Submit(func() {
//fmt.Printf("Running group task #%d\n", n)
time.Sleep(2 * time.Second)
})
}
// Wait for all tasks in the group to complete
group.Wait()
println("all tasks has complete")
time.Sleep(2 * time.Hour)
}
// active result log
mertics: running=10, ide=0, waiting=10
mertics: running=10, ide=0, waiting=10
mertics: running=10, ide=0, waiting=0
mertics: running=10, ide=0, waiting=0
mertics: running=10, ide=0, waiting=0
all tasks has complete
mertics: running=9, ide=9, waiting=0
mertics: running=9, ide=9, waiting=0
mertics: running=9, ide=9, waiting=0
mertics: running=9, ide=9, waiting=0
mertics: running=9, ide=9, waiting=0
Puzzled : why all tasks has complete, running task more than zero ?
Hello sir,
This might be a very nooby question, but say I want to do a simple for loop like so:
pool := pond.New(12, 1000)
for i := 0; i < 12; i++ {
pool.Submit(func() {
log.Println(i)
})
}
pool.StopAndWait()
This will output:
2021/07/09 11:23:26 12
2021/07/09 11:23:26 12
2021/07/09 11:23:26 12
2021/07/09 11:23:26 12
2021/07/09 11:23:26 12
2021/07/09 11:23:26 12
2021/07/09 11:23:26 12
2021/07/09 11:23:26 12
2021/07/09 11:23:26 12
2021/07/09 11:23:26 12
2021/07/09 11:23:26 12
2021/07/09 11:23:26 12
How do I pass variables to functions running in a pool?
p.s.: for comparison, this does seem to work
for i := 0; i < 12; i++ {
func() {
log.Println(i)
}()
}
Hello,
I would like to know what happens to the pool when a worker panics. Also what is the best approach to stop the pool when a worker panics.
Thanks and congrats for an awesome lib
Can pond add a feature to support task priority? Tasks with higher priority are processed first.
panic: unaligned 64-bit atomic operation
goroutine 226 [running]:
runtime/internal/atomic.panicUnaligned()
/usr/lib/go/src/runtime/internal/atomic/unaligned.go:8 +0x24
runtime/internal/atomic.Xadd64(0x105215c, 0x1)
/usr/lib/go/src/runtime/internal/atomic/atomic_arm.s:258 +0x14
github.com/alitto/pond.(*WorkerPool).submit(0x1052120, 0x1098b70, 0x1)
/home/mcuadros/workspace/go/pkg/mod/github.com/alitto/[email protected]/pond.go:245 +0x84
github.com/alitto/pond.(*WorkerPool).Submit(...)
/home/mcuadros/workspace/go/pkg/mod/github.com/alitto/[email protected]/pond.go:221
github.com/thegoodlock/control.(*Queue).onRequestMesssage(0x10541c0, 0x10a9720)
/home/mcuadros/Documents/ESP32-MAX485/Server/queue.go:140 +0x88
github.com/at-wat/mqtt-go.HandlerFunc.Serve(0x1096e60, 0x10a9720)
/home/mcuadros/workspace/go/pkg/mod/github.com/at-wat/[email protected]/mqtt.go:68 +0x24
github.com/at-wat/mqtt-go.(*ServeMux).Serve(0x10989f0, 0x10a9700)
/home/mcuadros/workspace/go/pkg/mod/github.com/at-wat/[email protected]/servemux.go:50 +0x21c
github.com/at-wat/mqtt-go.(*BaseClient).serve(0x1010100)
/home/mcuadros/workspace/go/pkg/mod/github.com/at-wat/[email protected]/serve.go:77 +0x6f4
github.com/at-wat/mqtt-go.(*BaseClient).Connect.func1()
/home/mcuadros/workspace/go/pkg/mod/github.com/at-wat/[email protected]/connect.go:121 +0x20
created by github.com/at-wat/mqtt-go.(*BaseClient).Connect
/home/mcuadros/workspace/go/pkg/mod/github.com/at-wat/[email protected]/connect.go:120 +0x178
hi @alitto pond is great. Thank you for your effort. I am studying the code of pond trying to figure out its design structure,
the verify of runningWorkerCount > 0 && p.Idle() > 0
in the following piece of code causes my confusion.
func (p *WorkerPool) submit(task func(), waitForIdle bool) bool {
if task == nil {
return false
}
runningWorkerCount := p.Running()
// Attempt to dispatch to an idle worker without blocking
if runningWorkerCount > 0 && p.Idle() > 0 {
select {
case p.tasks <- task:
return true
default:
// No idle worker available, continue
}
}
maxWorkersReached := runningWorkerCount >= p.maxWorkers
// Exit if we have reached the max. number of workers and can't wait for an idle worker
if maxWorkersReached && !waitForIdle {
return false
}
// Start a worker as long as we haven't reached the limit
if ok := p.maybeStartWorker(task); ok {
return true
}
// Submit the task to the tasks channel and wait for it to be picked up by a worker
p.tasks <- task
return true
}
What's the problem if we replace runningWorkerCount > 0 && p.Idle() > 0
with p.Idle() > 0
?
I'm playing around with this library, which looks very promising, and ran into an issue where the panic handler doesn't get called if you close the pool early. My usecase being trying to timeout a worker pool. However when I "timeout" it straight up panics with a panic: send on closed channel
This is my sample code:
func h(p interface{}) {
fmt.Printf("Catching panic: %v\n", p
}
func main() {
p := pond.New(5, 0, pond.PanicHandler(h))
// Simulate a timeout event of some sort.
time.AfterFunc(time.Second, func() {
p.StopAndWait()
})
for i := 0; i < 100; i++ {
i := i
p.Submit(func() {
fmt.Printf("Handling task %d\n", i)
time.Sleep(250 * time.Millisecond)
})
}
p.StopAndWait()
}
I'm using version 1.8.1.
By writing pond.New(30, 100, pond.MinWorkers(30))
I'm expecting to create 30 goroutines at startup and have a fix pool of 30 overtime, am I correct?
Thank you
File:pond.go
Function:decrementWorkerCount() and incrementWorkerCount()
Question:
In Go language, atomic packages supply lower-level atomic memory that is helpful is implementing synchronization algorithms. The AddInt32() function in Golang is used to atomically add delta to the *addr.
In addition, using the sync.Mutex to ensure the mutually exclusive execution of a piece of code.
Aren't these two equivalent?
Assuming that the pool is limited in size and is busy, an exception will occur when the pool is closed and the external task is still sending to the pool.
Test code:
func TestStop(t *testing.T) {
pool := New(1, 4, MinWorkers(1), IdleTimeout(1*time.Second))
// Simulate some users sending tasks to the pool
go func() {
for i := 0; i < 30; i++ {
pool.Submit(func() {
fmt.Println("do task")
time.Sleep(3 * time.Second)
})
}
}()
// Suppose the server is shut down after a period of time
time.Sleep(5 * time.Second)
pool.StopAndWait()
}
panic: send on closed channel
This is a question rather than a bug, and should preface this with saying I am new to Go. Also, should add: this library has been awesome to quickly add worker pools to my code!
I am iterating over a map and I think what I need is to be able to pass the current value of the key for the current iteration. Because as soon as I submit the func to the pool (pool.Submit(func(){})), since a map doesn't guarantee order I'm running into issues.
Example code that prints out the same item several times, rather than going through each one.
pool := pond.New(10, 0)
for _, item := range element {
fmt.Printf("%v",item)
pool.Submit(func() {
// Do something with **item**, but it's no longer the original value,
// so I'd like to pass it in as part of the loop, to this anonymous function to use
fmt.Printf("%v",item)
})
pool.StopAndWait()
This process can be done Async (it takes item and uses it as part of a web request), it's just it needs to match the current iteration instance of item, when it does not.
I'm not quite sure when this issue will happen, but it throws exception as below:
panic: send on closed channel
goroutine 712 [running]:
github.com/alitto/pond.(*WorkerPool).submit(0xc091f93a40, 0xc000536720, 0x1)
/home/ubuntu/gomods/pkg/mod/github.com/alitto/[email protected]/pond.go:277 +0x265
github.com/alitto/pond.(*WorkerPool).Submit(...)
/home/ubuntu/gomods/pkg/mod/github.com/alitto/[email protected]/pond.go:222
demofiles.Download.func6(0x2?, 0xc000452b60?, 0xc0005fe900?, {0xc000597000, 0x5db, 0x700})
/home/ubuntu/project1/files/download.go:837 +0x121d
created by demofiles.Download
/home/ubuntu/project1/files/download.go:820 +0x2125
exit status 2
// Submit the task to the tasks channel and wait for it to be picked up by a worker
p.tasks <- task
submitted = true
return
`
pool := pond.New(10, 50)
defer pool.StopAndWait()
group := pool.Group()
group.Submit(func() {
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
panic(11111)
}
group.Wait()
`
Hi:
What happens to the pool when a worker panic?
goroutine 331 [chan receive]:
github.com/alitto/pond.worker(0xc00e683548, 0xc00e6ceae0, 0xc00e6bf23c, 0xc00e6c3400, 0xc00e6c3410)
/home/work/go/pkg/mod/github.com/alitto/[email protected]/pond.go:430 +0xbf
created by github.com/alitto/pond.(*WorkerPool).maybeStartWorker
/home/work/go/pkg/mod/github.com/alitto/[email protected]/pond.go:348 +0xe9
goroutine 329 [running]:
goroutine running on other thread; stack unavailable
created by github.com/alitto/pond.(*WorkerPool).maybeStartWorker
/home/work/go/pkg/mod/github.com/alitto/[email protected]/pond.go:348 +0xe9
I'm a bit confused how I can make it so that I create 1 pool where I make submissions to 3 different tasks, but still have the same number of workers for each task
Example:
pool := pond.New(40, 0, pond.MinWorkers(40)) // 40 workers on each task
pool.Submit(func() {
//task 1 with 40 workers
})
pool.Submit(func() {
//task 2 with 40 workers
})
pool.Submit(func() {
//task 3 with 40 workers
})
pool.StopAndWait()
I need to pause workerpool without waiting tasks finishing and afterwards resume execution from the same place. Is it possible?
I want a list of tasks to submit to the pool, and want the submitted task to complete one after one, can pond support this scenario?
for example:
taskgroup1 have three tasks:
task1, task2, task3
taskgroup2 have two tasks:
task1, task2
taskgroup1 and taskgroup2 are all submitted to the pool, I want taskgroup1 and taskgroup2 run in concurrency. But in taskgroup1 and taskgroup2, the submitted tasks run one after another. i.e. in taskgroup1 when task1 finished, then task2 will run, and when task2 finished, task3 will run. In taskgroup2 when task1 finished, then task2 will run.
I have a system where I will be using pond for network related download tasks.
To avoid hitting rate limiters when issuing multiple network requests it would be great to be able to set unlimited task capacity.
Imagine a scenario where there are some downloads that are already finished and the data is easily accessible so there is no need to issue another download.
In such case a broader worker pool can have as much goroutines as number of CPU threads. But then you wouldn't like to have that for a worker pool downloading because you'd have potentially 24 - 56 threads attempting that at once.
Is there a way to have unlimited tasks queue and just 3-5 workers downloading at the same time? I was not able to find such example in the documentation and the code.
Would this mean that it should be set to the maximum integer capacity?
const MaxInt = int(MaxUint >> 1)
Additionally given that these values cannot be lower than zero:
// Make sure options are consistent
if pool.maxWorkers <= 0 {
pool.maxWorkers = 1
}
if pool.minWorkers > pool.maxWorkers {
pool.minWorkers = pool.maxWorkers
}
if pool.maxCapacity < 0 {
pool.maxCapacity = 0
}
if pool.idleTimeout < 0 {
pool.idleTimeout = defaultIdleTimeout
}
Wouldn't it be better to have these types be uint
?
Any way to submit a task and when an error happens, stop other workers? Kind of like errgroup.
I was initially using panics to handle errors because it seemed like the FailedTasks
supports using panics.
It seems a little weird to see a panic every time an error happens though; what is the canonical/right way to handle the error?
I have tried to download the module using go get URL, but I have not been able to, I suspect that the version of go 1.17.2 that is the one I currently occupy is placing a restriction for something that the repository is missing.
test demo :
package main
import (
"fmt"
"github.com/alitto/pond"
"time"
)
func testPool(i int, p *pond.WorkerPool) {
if i == 8 {
var stop string
// fmt.Printf("stop -- %d : ", i)
// fmt.Scanln(&stop)
stop = "yes"
if stop == "yes" {
fmt.Println("stop ok")
p.Stop()
return
}
}
fmt.Printf("run %d \n", i)
time.Sleep(time.Duration(i) * time.Second)
}
func main() {
// Create a buffered (non-blocking) pool that can scale up to 100 workers
// and has a buffer capacity of 1000 tasks
pool := pond.New(5, 1000, pond.MinWorkers(5))
// Submit 1000 tasks
for i := 1; i < 1000; i++ {
n := i
pool.Submit(func() {
testPool(n, pool)
})
}
// pool.RunningWorkers()
// Stop the pool and wait for all submitted tasks to complete
fmt.Println("wait all task done ...")
pool.StopAndWait()
}
package main
import (
"fmt"
"github.com/alitto/pond"
"time"
)
func testPool(i int, p *pond.WorkerPool) {
if i == 8 {
var stop string
// fmt.Printf("stop -- %d : ", i)
// fmt.Scanln(&stop)
stop = "yes"
if stop == "yes" {
fmt.Println("stop ok")
p.Stop()
return
}
}
fmt.Printf("run %d \n", i)
time.Sleep(time.Duration(i) * time.Second)
}
func main() {
// Create a buffered (non-blocking) pool that can scale up to 100 workers
// and has a buffer capacity of 1000 tasks
pool := pond.New(5, 1000, pond.MinWorkers(5))
// Submit 1000 tasks
for i := 1; i < 1000; i++ {
n := i
pool.Submit(func() {
testPool(n, pool)
})
}
// pool.RunningWorkers()
// Stop the pool and wait for all submitted tasks to complete
fmt.Println("wait all task done ...")
pool.StopAndWait()
}
output :
run 1
run 3
run 2
wait all task done ...
run 4
run 5
run 6
run 7
stop ok
run 9
run 10
fatal error: all goroutines are asleep - deadlock!
I experienced a deadlock when using pond in the following way (https://go.dev/play/p/eJLX1vc3C81).
workerpool := pond.New(10, 10, pond.Strategy(pond.Eager()))
batchWg := &sync.WaitGroup{}
batch := []string{"message1", "message2"}
for _, _ = range batch {
batchWg.Add(1)
workerpool.Submit(func() {
batchWg.Done()
})
}
batchWg.Wait()
Minimum workers defaults to 0 and purge
can stop idle workers IdleWorkers() > 0
.
At the same time, workerpool.Submit
can add a task but not start a worker because IdleWorkers() > 0
.
If the purger
managed to signal the worker to stop before the newly submitted job is consumed by the worker, the workerpool ends up with a non-empty task channel and no workers to process the tasks.
Possible solutions:
purge
and maybeStartWorker
to avoid such cases.Hey @alitto,
I'm trying to execute a simple test in order to grasp how pond works and I'm experiencing a strange delay when submitting 100 tasks which sleep 1 second each:
package main
import (
"fmt"
"time"
"github.com/alitto/pond"
)
func main() {
pool := pond.New(100, 1000)
defer pool.StopAndWait()
for i := 0; i < 100; i++ {
n := i
pool.Submit(func() {
time.Sleep(1 * time.Second)
fmt.Printf("Task #%d done\n", n)
})
}
}
I was expecting a single, 1 second delay at the end and printout of all "Task # done" at the same time, but I'm seeing tasks executing 1 per second instead. What am I doing wrong? Thanks!
Ivan
Hello,
I have a question, how do I apply this to something where I iterate over a slice of strings (paths to files), since I'm also doing subtasks in the function i submit to the pool.
It looks something like this:
File 1 -> fileHandler(client, path) -> Multipart upload to S3 (by streaming the files to a buffer, which also slices the file up into smaller pieces and at the end upload all the file parts as one unit.
thanks.
Submit an asynchronous task in an asynchronous task
taskPool.Submit(func(){
taskPool.submift(func(){
})
})
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.