Comments (14)
Just in case if someone is looking for an implementation:
# frozen_string_literal: true
def wait_first(*tasks, task: Async::Task.current)
c = Async::Notification.new
await = lambda do |task|
task.async do
task.wait
c.signal(task)
rescue StandardError
c.signal(task)
end
end
tasks.each(&await)
first = c.wait
[first, tasks - [first]]
end
Usage:
done, pending = wait_first(task1, task2, task3)
pending.each(&:stop) # cancel others
done.wait # return or re-raise first result
The API is influenced by python's asyncio.wait(tasks,return_when=asyncio.FIRST_COMPLETED)
Personally I'd be happy to have a similar method defined on Async
. Something like
Async.wait(*tasks, mode: :all)
- awaits all tasks, a shortcut for Barrier
Async.wait(*tasks, mode: :first)
- awaits first, either my implementation or a shortcut for some primitive with similar logic
I'd like to volunteer myself for this task. WDYT?
from async.
Sure sounds great, I don't have any strong opinions on the interface except that if it's fundamentally different from wait
we should make a separate method.
from async.
What about Async.wait_for
?
from async.
Waiting for all tasks vs waiting for the first task are quite different things IMHO and the use case is different. Waiting for all tasks can be achieved by tasks.each(&:wait)
. Waiting for the first N tasks to complete is more complex since you need to track all the tasks.
Needing to spawn N tasks to wait for the first task to finish out of N tasks seems like an anti-pattern.
We might need to consider how to do this more efficiently.
from async.
A Barrier-like primitive?
w = Async::Waiter.new # naming is currently out of scope
w.async{}
w.async{}
w.async{}
done, pending = w.wait_first(2) # or maybe w.wait(first: 2). Wait without arguments will wait for all just like Barrier, but with different return value
Even though waiting for all and waiting for N
have different use cases, waiting for all is just a special case of waiting for N. The proposed primitive does not care about task results letting the user decide how to deal with them and handle possible exceptions by awaiting the done
tasks again. In case of waiting for all, the done
list will contain all tasks and pending
will be empty
from async.
The most efficient implementation is something like this:
#!/usr/bin/env ruby
require 'async'
class Waiter
def initialize
@finished = Async::Condition.new
end
def async(&block)
Async do
result = begin
yield
rescue => error
error
end
@finished.signal(result)
end
end
def wait(n = 1)
n.times.collect do
@finished.wait
end
end
end
Async do
waiter = Waiter.new
waiter.async do
sleep 1
puts "1"; "1"
end
waiter.async do
sleep 2
puts "2"; "2"
end
waiter.async do
sleep 3
puts "3"; "3"
end
pp waiter.wait(2)
end
But that assumes we can force the user via this funnel.
from async.
Async::Barrier
already makes the assumption that users are going to call barrier.async
so we could add the interface there for wait(n = nil)
(nil for all, Integer for some subset)? or wait_for(n = 1)
/wait_some
?
from async.
In case if a task spawned with Waiter raises an exception your implementation will return it as a value, do I understend it correctly?
What if the task is supposed to return an instance of an exception(can't imagine a use case but it's possible in theory)? How to distinguish it from an actual exception raised from the task?
It also gives no information about pending tasks, the user would have to find what is still in progress manually after getting requested first results
And the last one: exceptions raised from the pending tasks after the user got their first results would be silently swallowed even if the user finds and awaits pending tasks manually.
Thats why I believe returning two lists of tasks from wait
is better than returning their results. It forces the user to do something with them. Collect the results, handle or reraise exceptions and either stop, or let pending tasks work further if they need so
The other option would be tracking of spawned tasks inside the waiter:
w.wait_for(2) # no need to return anything
# after waiting the done and pending lists do not change anymore, but tasks from pending still running and may finish any time
w.done # get finished tasks
w.pending # get pending
from async.
In case if a task spawned with Waiter raises an exception your implementation will return it as a value, do I understend it correctly?
It was just a MVP, you'd need to handle this correctly.
What if the task is supposed to return an instance of an exception(can't imagine a use case but it's possible in theory)? How to distinguish it from an actual exception raised from the task?
This is handled by Fiber#raise
but not in this code. Again, we might need to consider how to deal with it.
It also gives no information about pending tasks, the user would have to find what is still in progress manually after getting requested first results
The idea of a barrier is that it's stateful, you can call wait(2)
several times until it's exhausted.
from async.
We have introduced Async::LimitedBarrier
for this purpose.
https://github.com/socketry/async/blob/main/lib/async/limited_barrier.rb
from async.
Aaaaaand it's gone?
from async.
I think it was renamed #196
from async.
I'm still thinking about this.
Oddly enough, this morning when I was half asleep I started going through the entire design in my head and my sub-concious figured out we don't need Async::Waiter
either, since we can do the same implementation on Async::Group
just as efficiently. I'll make a PR.
from async.
I was wondering about Group
as well since it could be pretty flexible in use.
from async.
Related Issues (20)
- Unhandled Exception does not stop program running HOT 10
- [Enhanchment] Make alternate exception reporting (the colorful one) opt in. HOT 3
- [Question] How to stop a fiber? HOT 4
- Strange timeout bug. HOT 10
- Dynamic concurrency limiter / adaptive semaphore HOT 3
- Configure log level specificly for Async HOT 4
- Segmentation fault HOT 5
- ActiveRecord best practices support or documentation HOT 1
- [Question] What difference between `Async` and `Sync` HOT 1
- bundle error HOT 1
- Catch all Async errors and report to Sentry? (or other error reporting)
- How to wait for `.schedule`'d fibers to finish? HOT 7
- Properly managing interrupts (works for async v 1.31, "breaks" for async v 2.5.6) HOT 3
- Tasks signaling Conditions leave suspended Fibers behind HOT 8
- Error reporting difference between Sync{} and Async{}.wait HOT 2
- Understanding the difference of Sync usage within Falcon HOT 3
- macOS: `Errno::EINVAL: Invalid argument - IO_Event_Selector_KQueue_io_wait:IO_Event_Selector_KQueue_Waiting_register` HOT 18
- Stopping remaining tasks upon completion of one task HOT 2
- Blocking subprocess (popen3) HOT 10
- Sleep Hook HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from async.