Git Product home page Git Product logo

Comments (12)

shrekris-anyscale avatar shrekris-anyscale commented on August 16, 2024 1

It looks like this an issue with async actors in Ray Core. See the following repros:

With a synchronous inner actor
import time

import ray

ray.init()

@ray.remote
class Inner:

    def run(self):
        start_time = time.time()
        time.sleep(1)
        end_time = time.time()
        print(f"Inner call finished: {(end_time - start_time):.2f}s")

@ray.remote
class Outer:

    def __init__(self, inner_handle):
        self.inner_handle = inner_handle

    async def run(self):
        start_time = time.time()
        await self.inner_handle.run.remote()
        end_time = time.time()
        print(f"Outer call finished: {(end_time - start_time):.2f}s")

outer = Outer.remote(Inner.remote())
calls = [outer.run.remote() for _ in range(10)]
ray.get(calls)

Results:

% python check_core.py
2024-03-29 09:58:02,692	INFO worker.py:1743 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
(Inner pid=95616) Inner call finished: 1.01s
(Outer pid=95620) Outer call finished: 1.09s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 2.09s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 3.09s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 4.10s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 5.10s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 6.10s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 7.11s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 8.11s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 9.12s
(Inner pid=95616) Inner call finished: 1.00s
(Outer pid=95620) Outer call finished: 10.12s
With an async inner actor
import time

import ray

ray.init()

@ray.remote
class Inner:

    async def run(self):
        start_time = time.time()
        time.sleep(1)
        end_time = time.time()
        print(f"Inner call finished: {(end_time - start_time):.2f}s")

@ray.remote
class Outer:

    def __init__(self, inner_handle):
        self.inner_handle = inner_handle

    async def run(self):
        start_time = time.time()
        await self.inner_handle.run.remote()
        end_time = time.time()
        print(f"Outer call finished: {(end_time - start_time):.2f}s")

outer = Outer.remote(Inner.remote())
calls = [outer.run.remote() for _ in range(10)]
ray.get(calls)

Results:

% python check_core.py
2024-03-29 10:00:30,493	INFO worker.py:1743 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
(Inner pid=96401) Inner call finished: 1.01s
(Outer pid=96404) Outer call finished: 1.09s
(Inner pid=96401) Inner call finished: 1.00s
(Inner pid=96401) Inner call finished: 1.00s
(Inner pid=96401) Inner call finished: 1.00s
(Inner pid=96401) Inner call finished: 1.00s
(Inner pid=96401) Inner call finished: 1.00s
(Inner pid=96401) Inner call finished: 1.00s
(Inner pid=96401) Inner call finished: 1.00s
(Inner pid=96401) Inner call finished: 1.00s
(Inner pid=96401) Inner call finished: 1.00s
(Outer pid=96404) Outer call finished: 10.05s
(Outer pid=96404) Outer call finished: 10.05s
(Outer pid=96404) Outer call finished: 10.05s
(Outer pid=96404) Outer call finished: 10.05s
(Outer pid=96404) Outer call finished: 10.05s
(Outer pid=96404) Outer call finished: 10.05s
(Outer pid=96404) Outer call finished: 10.05s
(Outer pid=96404) Outer call finished: 10.05s
(Outer pid=96404) Outer call finished: 10.05s

from ray.

alexeykudinkin avatar alexeykudinkin commented on August 16, 2024 1

To circle back on what we believe the real issue to be here:

Non-issue

Invoking long-running, uninterrupted code inside async methods is problematic whether you're using Ray or not (you will observe similar side-effects in any other frameworks like FastAPI for ex as well)

Issue

However, if you declare method as sync, but your Ray Actor is actually async (which is the case for Serve actors) all of the methods will be converted to be async using sync_to_async utility (REF).

Problem with this utility is that it's naively converting opaque sync code into async in an unsafe manner: in cases when sync code block is a long-running one (time.sleep in your example) this means that the event-loop running it will be blocked for arbitrary long periods of time potentially compromising other operations through so called head-of-line blocking of the event-loop's queue.

This could lead to arbitrary side-effects, like for ex, what you're observing with responses seemingly being delayed until the last in the batch completes.

This is the issue we're going to be addressing to avoid this kind of unexpected behavior when combining both async and sync methods inside Ray Actors and Serve Deployments.

from ray.

wuxibin89 avatar wuxibin89 commented on August 16, 2024

After some digging, I found that model Adder's request 1-9 have already done in the _user_code_event_loop thread but still blocked at await in the main thread.
https://github.com/ray-project/ray/blob/master/python/ray/serve/_private/replica.py#L533-L535

from ray.

shrekris-anyscale avatar shrekris-anyscale commented on August 16, 2024

Thanks for submitting this @wuxibin89. I was able to validate your repro. I'm taking a look.

from ray.

alexeykudinkin avatar alexeykudinkin commented on August 16, 2024

@wuxibin89 i've run some experiments over the weekend and here are the conclusions:

  1. This is not a Ray Core issue: event-loop could NOT be blocked even if you offload your blocking activities into a separate Actor (blocking it just lead to unexpected side-effects like the one you observe in this issue)

  2. Right approach to handling it is to offload any activity taking considerable amount of wall-clock time to complete (uninterrupted) and execute it in executor (using loop.run_in_executor) like you'd normally do in any async web app

So in this case if you run experiments @shrekris-anyscale provided above like following you'd get a) same behavior as sync implementation and b) see the pipelining you're expecting:

import time

import ray

ray.init()

@ray.remote
class Inner:

    def __init__(self):
        self.executor = ThreadPoolExecutor(1)

    async def run(self):
        start_time = time.time()

        # Code blocking I/O should be offloaded to an TPE
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(self.executor, time.sleep, 1)

        end_time = time.time()
        print(f"Inner call finished: {(end_time - start_time):.2f}s")

@ray.remote
class Outer:

    def __init__(self, inner_handle):
        self.inner_handle = inner_handle

    async def run(self):
        start_time = time.time()
        await self.inner_handle.run.remote()
        end_time = time.time()
        print(f"Outer call finished: {(end_time - start_time):.2f}s")

outer = Outer.remote(Inner.remote())
calls = [outer.run.remote() for _ in range(10)]
ray.get(calls)

from ray.

alexeykudinkin avatar alexeykudinkin commented on August 16, 2024

And here's the correct output code above produces:

2024-04-01 10:40:39,594	INFO worker.py:1743 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
[2024-04-01 10:40:39,596 I 76194 57421105] logging.cc:234: Set ray log level from environment variable RAY_BACKEND_LOG_LEVEL to -1
(raylet) [2024-04-01 10:40:40,040 I 76218 57421351] logging.cc:234: Set ray log level from environment variable RAY_BACKEND_LOG_LEVEL to -1
(raylet) [2024-04-01 10:40:40,346 I 76220 57421856] logging.cc:234: Set ray log level from environment variable RAY_BACKEND_LOG_LEVEL to -1
(raylet) [2024-04-01 10:40:40,395 I 76221 57421870] logging.cc:234: Set ray log level from environment variable RAY_BACKEND_LOG_LEVEL to -1
(Inner pid=76220) Inner call finished: 1.01s
(Outer pid=76221) Outer call finished: 1.08s
(Inner pid=76220) Inner call finished: 2.01s
(Outer pid=76221) Outer call finished: 2.01s
(Inner pid=76220) Inner call finished: 3.01s
(Outer pid=76221) Outer call finished: 3.02s
(Inner pid=76220) Inner call finished: 4.01s
(Outer pid=76221) Outer call finished: 4.02s
(Inner pid=76220) Inner call finished: 5.02s
(Outer pid=76221) Outer call finished: 5.02s

from ray.

wuxibin89 avatar wuxibin89 commented on August 16, 2024

@alexeykudinkin Thanks for your reply. I agree that any blocking code in async actor should be executed in a separate thread, that's how serve replica do. But the blocking still exists, so should we use run_in_executor instead of run_coroutine_threadsafe?
https://github.com/ray-project/ray/blob/master/python/ray/serve/_private/replica.py#L766-L785

from ray.

alexeykudinkin avatar alexeykudinkin commented on August 16, 2024

@wuxibin89 we can't replace run_coroutine_threadsafe with just run_in_executor b/c this code-path executes both sync and async methods

from ray.

alexeykudinkin avatar alexeykudinkin commented on August 16, 2024

This is a repro script for issue called out above using Ray (async) Actors:

import time
import ray

ray.init()


@ray.remote(num_cpus=1)
class Inner:

    # NOTE: It's crucial for Actor to be considered *async*, hence we add phony async method
    #       which is not used in this test
    async def foo(self):
        pass

    # This particular method is *sync* hence doing long-running blocking operation (emulated
    # by time.sleep) is considered ok in this case
    def run(self):
        start_time = time.time()
        time.sleep(1)
        end_time = time.time()
        print(f"Inner call finished: {(end_time - start_time):.2f}s")


@ray.remote(num_cpus=1)
class Outer:

    def __init__(self, inner_handle):
        self.inner_handle = inner_handle

    async def run(self):
        start_time = time.time()
        await self.inner_handle.run.remote()
        end_time = time.time()
        print(f"Outer call finished: {(end_time - start_time):.2f}s")


inner = Inner.remote()
outer = Outer.remote(inner)

calls = [outer.run.remote() for _ in range(5)]
ray.get(calls)

from ray.

alexeykudinkin avatar alexeykudinkin commented on August 16, 2024

📣 Alright, wanted to give quick update on the state of discussion concerning this issue

  • We will NOT be changing current out-of-the-box behavior in Ray Core: reason for that is that offloading sync methods in async actors to a standalone executor will make code that has been previously thread-safe (since sync actors were executed in the same event-loop threads as async methods) and make it non-thread-safe, which is a breaking change that compromises a lot of code out there that we don't believe is worth risking stability of the existing workloads.

Instead we propose following changes

  • Update Ray Core docs listing concrete examples of what side-effects could be observed when combining async and sync methods.
  • Suggestion to users in this case is either a) separate async and sync methods to different actors (so that sync methods aren't executed on the same event-loop or b) offload sync execution to a standalone ThreadPoolExecutor

Additionally

  • We'll be making changes in Ray Serve to align it with FastAPI semantics where sync methods are automatically offloaded to standalone ThreadPoolExecutor (which is a safe operation unlike in Ray Core)

cc @wuxibin89

from ray.

wuxibin89 avatar wuxibin89 commented on August 16, 2024

Great! We should be careful to do any break change to Ray Core. To Ray Serve, what if user define sync and async method at the same time (or should we ban this pattern)? Are we gonna offload sync methods to ThreadPoolExecutor and async methods to separate event loop thread, OR offload all sync and async methods to ThreadPoolExecutor?

@serve.deployment
class Adder:
    def __init__(self, increment: int):
        self._increment = increment

    def compute(self, val: int) -> int:
        # simulate compute intensive ops
        time.sleep(1)
        return val + self._increment

    async def incr(self):
        self._increment += 1
        return self._increment


adder = Adder.bind(increment=1)
handle: DeploymentHandle = serve.run(adder)

result = handle.compute.remote(10)
print(f"result: {result.result()}")

result = handle.incr.remote()
print(f"result: {result.result()}")

from ray.

edoakes avatar edoakes commented on August 16, 2024

Great! We should be careful to do any break change to Ray Core. To Ray Serve, what if user define sync and async method at the same time (or should we ban this pattern)? Are we gonna offload sync methods to ThreadPoolExecutor and async methods to separate event loop thread, OR offload all sync and async methods to ThreadPoolExecutor?

@serve.deployment
class Adder:
    def __init__(self, increment: int):
        self._increment = increment

    def compute(self, val: int) -> int:
        # simulate compute intensive ops
        time.sleep(1)
        return val + self._increment

    async def incr(self):
        self._increment += 1
        return self._increment


adder = Adder.bind(increment=1)
handle: DeploymentHandle = serve.run(adder)

result = handle.compute.remote(10)
print(f"result: {result.result()}")

result = handle.incr.remote()
print(f"result: {result.result()}")

There will continue to be a single asyncio event loop / thread per replica. The sync methods will be run in an executor and the corresponding future will be awaited on the event loop.

from ray.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.