Git Product home page Git Product logo

Comments (6)

zzstoatzz avatar zzstoatzz commented on September 24, 2024

hi @carlosjourdan - thanks for the issue

I think there are a couple things here

  • I dont think its necessary / appropriate to use our low level callback util add_event_loop_shutdown_callback for this, you can just use aiohttp.Sesson as a context manager, and it will handle cleanup for you
@task()
async def download_file(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            assert resp.status == 200
            return await resp.read()
  • secondly, since tasks currently run in worker threads, you cannot directly share the same Session across threads. You can create a new session like I am above to avoid this, or you can use subflows which run in the main thread
A full working example
import asyncio

import aiohttp

from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner


@flow(task_runner=ConcurrentTaskRunner)
async def my_flow():
    for url in ["https://www.google.com", "https://www.microsoft.com"]:
        data = await download_file.submit(url)
        do_something_with_data.submit(data)


@task()
async def download_file(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            assert resp.status == 200
            return await resp.read()


@task()
def do_something_with_data(data): ...


if __name__ == "__main__":
    asyncio.run(my_flow())

If you have any further questions, let me know. otherwise I will close this issue

from prefect.

carlosjourdan avatar carlosjourdan commented on September 24, 2024

Hi @zzstoatzz , thanks for the fast reply. I still have two questions, if you don't mind:

  • Based on #11930 , shouldn't the task run on the main thread? My use case is explicitly mentioned as one of the motivations arround the PR ("For example, an HTTP client or database connection can be shared between a flow and its tasks now")
  • Although the flows on the example you listed complete successfully, an exception is printed to the console with the message RuntimeError: Event loop is closed. This is actually what threw me off course towards the add_event_loop_shutdown_callback call, because I thought this had something to do with the session.close() call being made after the event loop was closed. But it seems to happen even in the absence of a shared session. Is this exception "harmless"? If so, shouldn't it be suppressed from output?

from prefect.

carlosjourdan avatar carlosjourdan commented on September 24, 2024

Just saw that #11930 was reverted by #12054. So this leaves only the question about the RuntimeError: Event loop is closed message.

from prefect.

zzstoatzz avatar zzstoatzz commented on September 24, 2024

hi @carlosjourdan - let me look into that Event loop is closed. I don't remember seeing that but I will see if I can reproduce

from prefect.

carlosjourdan avatar carlosjourdan commented on September 24, 2024

Thanks @zzstoatzz . FYI, I'm able to reproduce the error using your "full working example" code on a windows machine with fresh venv in Python 3.9.13. But the code works fine on Python 3.11.7.

Here is the output of pip freeze for reference:

aiohttp==3.9.5
aiosignal==1.3.1
aiosqlite==0.20.0
alembic==1.13.1
annotated-types==0.6.0
anyio==3.7.1
apprise==1.7.6
asgi-lifespan==2.1.0
async-timeout==4.0.3
asyncpg==0.29.0
attrs==23.2.0
cachetools==5.3.3
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
cloudpickle==3.0.0
colorama==0.4.6
coolname==2.2.0
croniter==2.0.5
cryptography==42.0.5
dateparser==1.2.0
dnspython==2.6.1
docker==6.1.3
email_validator==2.1.1
exceptiongroup==1.2.1
frozenlist==1.4.1
fsspec==2024.3.1
google-auth==2.29.0
graphviz==0.20.3
greenlet==3.0.3
griffe==0.44.0
h11==0.14.0
h2==4.1.0
hpack==4.0.0
httpcore==1.0.5
httpx==0.27.0
humanize==4.9.0
hyperframe==6.0.1
idna==3.7
importlib_metadata==7.1.0
importlib_resources==6.1.3
itsdangerous==2.2.0
Jinja2==3.1.3
jinja2-humanize-extension==0.4.0
jsonpatch==1.33
jsonpointer==2.4
jsonschema==4.21.1
jsonschema-specifications==2023.12.1
kubernetes==29.0.0
Mako==1.3.3
Markdown==3.6
markdown-it-py==3.0.0
MarkupSafe==2.1.5
mdurl==0.1.2
multidict==6.0.5
oauthlib==3.2.2
orjson==3.10.1
packaging==24.0
pathspec==0.12.1
pendulum==2.1.2
prefect==2.18.1
pyasn1==0.6.0
pyasn1_modules==0.4.0
pycparser==2.22
pydantic==2.7.1
pydantic_core==2.18.2
Pygments==2.17.2
python-dateutil==2.9.0.post0
python-multipart==0.0.9
python-slugify==8.0.4
pytz==2024.1
pytzdata==2020.1
pywin32==306
PyYAML==6.0.1
readchar==4.0.6
referencing==0.35.0
regex==2024.4.28
requests==2.31.0
requests-oauthlib==2.0.0
rfc3339-validator==0.1.4
rich==13.7.1
rpds-py==0.18.0
rsa==4.9
ruamel.yaml==0.18.6
ruamel.yaml.clib==0.2.8
shellingham==1.5.4
six==1.16.0
sniffio==1.3.1
SQLAlchemy==2.0.29
text-unidecode==1.3
toml==0.10.2
typer==0.12.3
typing_extensions==4.11.0
tzdata==2024.1
tzlocal==5.2
ujson==5.9.0
urllib3==2.2.1
uvicorn==0.28.1
websocket-client==1.8.0
websockets==12.0
yarl==1.9.4
zipp==3.18.1

from prefect.

zzstoatzz avatar zzstoatzz commented on September 24, 2024

hi @carlosjourdan - thanks for the context! I have reproduced this specifically on older python in windows

I will add this issue to the backlog. We are doing work to simplify our use of threads in the prefect engine, so hopefully that should help fix / diagnose this.

from prefect.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.