Comments (12)
It looks like this an issue with async actors in Ray Core. See the following repros:
With a synchronous inner actor
import time
import ray
ray.init()
@ray.remote
class Inner:
def run(self):
start_time = time.time()
time.sleep(1)
end_time = time.time()
print(f"Inner call finished: {(end_time - start_time):.2f}s")
@ray.remote
class Outer:
def __init__(self, inner_handle):
self.inner_handle = inner_handle
async def run(self):
start_time = time.time()
await self.inner_handle.run.remote()
end_time = time.time()
print(f"Outer call finished: {(end_time - start_time):.2f}s")
outer = Outer.remote(Inner.remote())
calls = [outer.run.remote() for _ in range(10)]
ray.get(calls)
Results:
% python check_core.py
2024-03-29 09:58:02,692 INFO worker.py:1743 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(Inner pid=95616) Inner call finished: 1.01s
(Outer pid=95620) Outer call finished: 1.09s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 2.09s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 3.09s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 4.10s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 5.10s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 6.10s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 7.11s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 8.11s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 9.12s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 10.12s
With an async inner actor
import time
import ray
ray.init()
@ray.remote
class Inner:
async def run(self):
start_time = time.time()
time.sleep(1)
end_time = time.time()
print(f"Inner call finished: {(end_time - start_time):.2f}s")
@ray.remote
class Outer:
def __init__(self, inner_handle):
self.inner_handle = inner_handle
async def run(self):
start_time = time.time()
await self.inner_handle.run.remote()
end_time = time.time()
print(f"Outer call finished: {(end_time - start_time):.2f}s")
outer = Outer.remote(Inner.remote())
calls = [outer.run.remote() for _ in range(10)]
ray.get(calls)
Results:
% python check_core.py
2024-03-29 10:00:30,493 INFO worker.py:1743 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(Inner pid=96401) Inner call finished: 1.01s
(Outer pid=96404) Outer call finished: 1.09s
(Inner pid=96401) Inner call finished: 1.00s
(Inner pid=96401) Inner call finished: 1.00s
(Inner pid=96401) Inner call finished: 1.00s
(Inner pid=96401) Inner call finished: 1.00s
(Inner pid=96401) Inner call finished: 1.00s
(Inner pid=96401) Inner call finished: 1.00s
(Inner pid=96401) Inner call finished: 1.00s
(Inner pid=96401) Inner call finished: 1.00s
(Inner pid=96401) Inner call finished: 1.00s
(Outer pid=96404) Outer call finished: 10.05s
(Outer pid=96404) Outer call finished: 10.05s
(Outer pid=96404) Outer call finished: 10.05s
(Outer pid=96404) Outer call finished: 10.05s
(Outer pid=96404) Outer call finished: 10.05s
(Outer pid=96404) Outer call finished: 10.05s
(Outer pid=96404) Outer call finished: 10.05s
(Outer pid=96404) Outer call finished: 10.05s
(Outer pid=96404) Outer call finished: 10.05s
from ray.
To circle back on what we believe the real issue to be here:
Non-issue
Invoking long-running, uninterrupted code inside async methods is problematic whether you're using Ray or not (you will observe similar side-effects in any other frameworks like FastAPI for ex as well)
Issue
However, if you declare method as sync, but your Ray Actor is actually async (which is the case for Serve actors) all of the methods will be converted to be async using sync_to_async
utility (REF).
Problem with this utility is that it's naively converting opaque sync code into async in an unsafe manner: in cases when sync code block is a long-running one (time.sleep
in your example) this means that the event-loop running it will be blocked for arbitrary long periods of time potentially compromising other operations through so called head-of-line blocking of the event-loop's queue.
This could lead to arbitrary side-effects, like for ex, what you're observing with responses seemingly being delayed until the last in the batch completes.
This is the issue we're going to be addressing to avoid this kind of unexpected behavior when combining both async and sync methods inside Ray Actors and Serve Deployments.
from ray.
After some digging, I found that model Adder's request 1-9 have already done in the _user_code_event_loop
thread but still blocked at await
in the main thread.
https://github.com/ray-project/ray/blob/master/python/ray/serve/_private/replica.py#L533-L535
from ray.
Thanks for submitting this @wuxibin89. I was able to validate your repro. I'm taking a look.
from ray.
@wuxibin89 i've run some experiments over the weekend and here are the conclusions:
-
This is not a Ray Core issue: event-loop could NOT be blocked even if you offload your blocking activities into a separate Actor (blocking it just lead to unexpected side-effects like the one you observe in this issue)
-
Right approach to handling it is to offload any activity taking considerable amount of wall-clock time to complete (uninterrupted) and execute it in executor (using
loop.run_in_executor
) like you'd normally do in any async web app
So in this case if you run experiments @shrekris-anyscale provided above like following you'd get a) same behavior as sync implementation and b) see the pipelining you're expecting:
import time
import ray
ray.init()
@ray.remote
class Inner:
def __init__(self):
self.executor = ThreadPoolExecutor(1)
async def run(self):
start_time = time.time()
# Code blocking I/O should be offloaded to an TPE
loop = asyncio.get_running_loop()
await loop.run_in_executor(self.executor, time.sleep, 1)
end_time = time.time()
print(f"Inner call finished: {(end_time - start_time):.2f}s")
@ray.remote
class Outer:
def __init__(self, inner_handle):
self.inner_handle = inner_handle
async def run(self):
start_time = time.time()
await self.inner_handle.run.remote()
end_time = time.time()
print(f"Outer call finished: {(end_time - start_time):.2f}s")
outer = Outer.remote(Inner.remote())
calls = [outer.run.remote() for _ in range(10)]
ray.get(calls)
from ray.
And here's the correct output code above produces:
2024-04-01 10:40:39,594 INFO worker.py:1743 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
[2024-04-01 10:40:39,596 I 76194 57421105] logging.cc:234: Set ray log level from environment variable RAY_BACKEND_LOG_LEVEL to -1
(raylet) [2024-04-01 10:40:40,040 I 76218 57421351] logging.cc:234: Set ray log level from environment variable RAY_BACKEND_LOG_LEVEL to -1
(raylet) [2024-04-01 10:40:40,346 I 76220 57421856] logging.cc:234: Set ray log level from environment variable RAY_BACKEND_LOG_LEVEL to -1
(raylet) [2024-04-01 10:40:40,395 I 76221 57421870] logging.cc:234: Set ray log level from environment variable RAY_BACKEND_LOG_LEVEL to -1
(Inner pid=76220) Inner call finished: 1.01s
(Outer pid=76221) Outer call finished: 1.08s
(Inner pid=76220) Inner call finished: 2.01s
(Outer pid=76221) Outer call finished: 2.01s
(Inner pid=76220) Inner call finished: 3.01s
(Outer pid=76221) Outer call finished: 3.02s
(Inner pid=76220) Inner call finished: 4.01s
(Outer pid=76221) Outer call finished: 4.02s
(Inner pid=76220) Inner call finished: 5.02s
(Outer pid=76221) Outer call finished: 5.02s
from ray.
@alexeykudinkin Thanks for your reply. I agree that any blocking code in async actor should be executed in a separate thread, that's how serve replica do. But the blocking still exists, so should we use run_in_executor
instead of run_coroutine_threadsafe
?
https://github.com/ray-project/ray/blob/master/python/ray/serve/_private/replica.py#L766-L785
from ray.
@wuxibin89 we can't replace run_coroutine_threadsafe
with just run_in_executor
b/c this code-path executes both sync and async methods
from ray.
This is a repro script for issue called out above using Ray (async) Actors:
import time
import ray
ray.init()
@ray.remote(num_cpus=1)
class Inner:
# NOTE: It's crucial for Actor to be considered *async*, hence we add phony async method
# which is not used in this test
async def foo(self):
pass
# This particular method is *sync* hence doing long-running blocking operation (emulated
# by time.sleep) is considered ok in this case
def run(self):
start_time = time.time()
time.sleep(1)
end_time = time.time()
print(f"Inner call finished: {(end_time - start_time):.2f}s")
@ray.remote(num_cpus=1)
class Outer:
def __init__(self, inner_handle):
self.inner_handle = inner_handle
async def run(self):
start_time = time.time()
await self.inner_handle.run.remote()
end_time = time.time()
print(f"Outer call finished: {(end_time - start_time):.2f}s")
inner = Inner.remote()
outer = Outer.remote(inner)
calls = [outer.run.remote() for _ in range(5)]
ray.get(calls)
from ray.
📣 Alright, wanted to give quick update on the state of discussion concerning this issue
- We will NOT be changing current out-of-the-box behavior in Ray Core: reason for that is that offloading sync methods in async actors to a standalone executor will make code that has been previously thread-safe (since sync actors were executed in the same event-loop threads as async methods) and make it non-thread-safe, which is a breaking change that compromises a lot of code out there that we don't believe is worth risking stability of the existing workloads.
Instead we propose following changes
- Update Ray Core docs listing concrete examples of what side-effects could be observed when combining async and sync methods.
- Suggestion to users in this case is either a) separate async and sync methods to different actors (so that sync methods aren't executed on the same event-loop or b) offload sync execution to a standalone
ThreadPoolExecutor
Additionally
- We'll be making changes in Ray Serve to align it with FastAPI semantics where sync methods are automatically offloaded to standalone
ThreadPoolExecutor
(which is a safe operation unlike in Ray Core)
cc @wuxibin89
from ray.
Great! We should be careful to do any break change to Ray Core. To Ray Serve, what if user define sync and async method at the same time (or should we ban this pattern)? Are we gonna offload sync methods to ThreadPoolExecutor and async methods to separate event loop thread, OR offload all sync and async methods to ThreadPoolExecutor?
@serve.deployment
class Adder:
def __init__(self, increment: int):
self._increment = increment
def compute(self, val: int) -> int:
# simulate compute intensive ops
time.sleep(1)
return val + self._increment
async def incr(self):
self._increment += 1
return self._increment
adder = Adder.bind(increment=1)
handle: DeploymentHandle = serve.run(adder)
result = handle.compute.remote(10)
print(f"result: {result.result()}")
result = handle.incr.remote()
print(f"result: {result.result()}")
from ray.
Great! We should be careful to do any break change to Ray Core. To Ray Serve, what if user define sync and async method at the same time (or should we ban this pattern)? Are we gonna offload sync methods to ThreadPoolExecutor and async methods to separate event loop thread, OR offload all sync and async methods to ThreadPoolExecutor?
@serve.deployment class Adder: def __init__(self, increment: int): self._increment = increment def compute(self, val: int) -> int: # simulate compute intensive ops time.sleep(1) return val + self._increment async def incr(self): self._increment += 1 return self._increment adder = Adder.bind(increment=1) handle: DeploymentHandle = serve.run(adder) result = handle.compute.remote(10) print(f"result: {result.result()}") result = handle.incr.remote() print(f"result: {result.result()}")
There will continue to be a single asyncio
event loop / thread per replica. The sync methods will be run in an executor and the corresponding future will be awaited on the event loop.
from ray.
Related Issues (20)
- CI test linux://rllib:examples/multi_agent/custom_heuristic_policy is consistently_failing HOT 1
- AttributeError: 'NCCLCommunicator' object has no attribute 'comm' HOT 1
- [Data] Does map_batches support runtime_env settings? HOT 2
- [Workflows] Can workflow support batch processing capabilities?
- [core] ray.shutdown() hangs in joining the logger thread
- algo.train() unclear number of training steps
- Expired TLS certificates for docs.ray.io
- [Serve] Remove unnecessary checks in `serve.get_app_handle`/`ServeControllerClient.get_handle()`
- [Dashboard] reset tracked tasks to 0 from client after maximum hi HOT 1
- [Data] `ArrowInvalid: offset overflow` when calling `Dataset.map_groups()`
- CI test linux://rllib:examples/connectors/prev_actions_prev_rewards_multi_agent_ppo is flaky HOT 2
- Release test chaos_torch_batch_inference_16_gpu_300gb_raw.aws failed HOT 1
- [serve] `InMemoryMetricsStore` leaks memory with handle-side autoscaling metrics enabled
- Phasing out Ray Docker images for Intel-based Mac OSX
- [Serve][High] How to properly configure the load on replicas?
- There are some deprecated items that need to be removed HOT 1
- CI test linux://rllib:examples/evaluation/evaluation_parallel_to_training_multi_agent_duration_auto_torch_envrunner is flaky HOT 4
- Ray v2.11.0 missing windows distribution HOT 7
- [Ray Tune/ Train] Auth with aws_web_identity_token or use the provided file system provider in runtime config HOT 5
- [<Ray component: Core>] ray raise error on nvidia cuda machine for amdgpu missing HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from ray.