Comments (6)
hi @carlosjourdan - thanks for the issue
I think there are a couple things here
- I dont think its necessary / appropriate to use our low level callback util
add_event_loop_shutdown_callback
for this, you can just useaiohttp.Sesson
as a context manager, and it will handle cleanup for you
@task()
async def download_file(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
assert resp.status == 200
return await resp.read()
- secondly, since tasks currently run in worker threads, you cannot directly share the same
Session
across threads. You can create a new session like I am above to avoid this, or you can use subflows which run in the main thread
A full working example
import asyncio
import aiohttp
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@flow(task_runner=ConcurrentTaskRunner)
async def my_flow():
for url in ["https://www.google.com", "https://www.microsoft.com"]:
data = await download_file.submit(url)
do_something_with_data.submit(data)
@task()
async def download_file(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
assert resp.status == 200
return await resp.read()
@task()
def do_something_with_data(data): ...
if __name__ == "__main__":
asyncio.run(my_flow())
If you have any further questions, let me know. otherwise I will close this issue
from prefect.
Hi @zzstoatzz , thanks for the fast reply. I still have two questions, if you don't mind:
- Based on #11930 , shouldn't the task run on the main thread? My use case is explicitly mentioned as one of the motivations arround the PR ("For example, an HTTP client or database connection can be shared between a flow and its tasks now")
- Although the flows on the example you listed complete successfully, an exception is printed to the console with the message
RuntimeError: Event loop is closed
. This is actually what threw me off course towards theadd_event_loop_shutdown_callback
call, because I thought this had something to do with thesession.close()
call being made after the event loop was closed. But it seems to happen even in the absence of a shared session. Is this exception "harmless"? If so, shouldn't it be suppressed from output?
from prefect.
Just saw that #11930 was reverted by #12054. So this leaves only the question about the RuntimeError: Event loop is closed
message.
from prefect.
hi @carlosjourdan - let me look into that Event loop is closed
. I don't remember seeing that but I will see if I can reproduce
from prefect.
Thanks @zzstoatzz . FYI, I'm able to reproduce the error using your "full working example" code on a windows machine with fresh venv in Python 3.9.13. But the code works fine on Python 3.11.7.
Here is the output of pip freeze for reference:
aiohttp==3.9.5
aiosignal==1.3.1
aiosqlite==0.20.0
alembic==1.13.1
annotated-types==0.6.0
anyio==3.7.1
apprise==1.7.6
asgi-lifespan==2.1.0
async-timeout==4.0.3
asyncpg==0.29.0
attrs==23.2.0
cachetools==5.3.3
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
cloudpickle==3.0.0
colorama==0.4.6
coolname==2.2.0
croniter==2.0.5
cryptography==42.0.5
dateparser==1.2.0
dnspython==2.6.1
docker==6.1.3
email_validator==2.1.1
exceptiongroup==1.2.1
frozenlist==1.4.1
fsspec==2024.3.1
google-auth==2.29.0
graphviz==0.20.3
greenlet==3.0.3
griffe==0.44.0
h11==0.14.0
h2==4.1.0
hpack==4.0.0
httpcore==1.0.5
httpx==0.27.0
humanize==4.9.0
hyperframe==6.0.1
idna==3.7
importlib_metadata==7.1.0
importlib_resources==6.1.3
itsdangerous==2.2.0
Jinja2==3.1.3
jinja2-humanize-extension==0.4.0
jsonpatch==1.33
jsonpointer==2.4
jsonschema==4.21.1
jsonschema-specifications==2023.12.1
kubernetes==29.0.0
Mako==1.3.3
Markdown==3.6
markdown-it-py==3.0.0
MarkupSafe==2.1.5
mdurl==0.1.2
multidict==6.0.5
oauthlib==3.2.2
orjson==3.10.1
packaging==24.0
pathspec==0.12.1
pendulum==2.1.2
prefect==2.18.1
pyasn1==0.6.0
pyasn1_modules==0.4.0
pycparser==2.22
pydantic==2.7.1
pydantic_core==2.18.2
Pygments==2.17.2
python-dateutil==2.9.0.post0
python-multipart==0.0.9
python-slugify==8.0.4
pytz==2024.1
pytzdata==2020.1
pywin32==306
PyYAML==6.0.1
readchar==4.0.6
referencing==0.35.0
regex==2024.4.28
requests==2.31.0
requests-oauthlib==2.0.0
rfc3339-validator==0.1.4
rich==13.7.1
rpds-py==0.18.0
rsa==4.9
ruamel.yaml==0.18.6
ruamel.yaml.clib==0.2.8
shellingham==1.5.4
six==1.16.0
sniffio==1.3.1
SQLAlchemy==2.0.29
text-unidecode==1.3
toml==0.10.2
typer==0.12.3
typing_extensions==4.11.0
tzdata==2024.1
tzlocal==5.2
ujson==5.9.0
urllib3==2.2.1
uvicorn==0.28.1
websocket-client==1.8.0
websockets==12.0
yarl==1.9.4
zipp==3.18.1
from prefect.
hi @carlosjourdan - thanks for the context! I have reproduced this specifically on older python in windows
I will add this issue to the backlog. We are doing work to simplify our use of threads in the prefect engine, so hopefully that should help fix / diagnose this.
from prefect.
Related Issues (20)
- Running airbyte reset connection job via prefect HOT 1
- Refreshing Work Pools page displays blank page HOT 1
- `test_regression_3803_positive_case_no_relevant_event` is flaky HOT 1
- `ECS Worker` flow run crashes on `RunTask` cause of `Some tags contain invalid characters` HOT 7
- Windows - Prefect GitHub `get_directory` does not clone any files to the `local_path`
- "RuntimeError: is bound to a different event loop" when using prefect client from sync flow in kubernetes HOT 2
- "Inspect" CLI commands should support JSON output HOT 4
- After adding a block as a job variable default to a work pool, its base job template does not load on the Edit page unless I resize to a different breakpoint
- Update all DeploymentSchedule schemas to include new fields
- Add an "Automate" option to the Webhooks three-dot menu in the UI
- Add new fields to deployment schedule forms in the UI
- Update the scheduler to use a deployment schedule's max_scheduled_runs if set
- Make Workers aware of schedule concurrency limits
- Start tracking the schedule that generated a flow run with the `CreatedBy` field
- Add documentation for new deployment schedule fields
- Add support for new fields to `prefect deploy` and prefect.yaml
- Add CLI options to relevant `prefect deployment schedule` commands
- Parameters appear and then disappear when clicking them HOT 2
- anyway to install prefect client side only? HOT 1
- `client/orchestration.py/update_deployment_schedule` should be update with `active` and `schedule` at once
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from prefect.