Git Product home page Git Product logo

celery-aio-pool's Introduction

Celery AsyncIO Pool

python version downloads format

Free software: GNU Affero General Public License v3+

Getting Started

Installation

Using Poetry (preferred)

poetry add celery-aio-pool

Using pip & PyPI.org

pip install celery-aio-pool

Using pip & GitHub

pip install git+https://github.com/the-wondersmith/celery-aio-pool.git

Using pip & A Local Copy Of The Repo

git clone https://github.com/the-wondersmith/celery-aio-pool.git
cd celery-aio-pool
pip install -e "$(pwd)"

Configure Celery

Using celery-aio-pool's Provided Patcher (non-preferred)

  • Import celery_aio_pool in the same module where your Celery "app" is defined
  • Ensure that the patch_celery_tracer utility is called before any other Celery code is called
"""My super awesome Celery app."""

# ...
from celery import Celery

# add the following import
import celery_aio_pool as aio_pool

# ensure the patcher is called *before*
# your Celery app is defined

assert aio_pool.patch_celery_tracer() is True

app = Celery(
    "my-super-awesome-celery-app",
    broker="amqp://guest@localhost//",
    # add the following keyword argument
    worker_pool=aio_pool.pool.AsyncIOPool,
)

Using (Upcoming) Out-Of-Tree Worker Pool (preferred)

At the time of writing, Celery does not have built-in support for out-of-tree pools like celery-aio-pool, but support should be included starting with the first non-beta release of Celery 5.3. (note: PR #7880 was merged on 2022-11-15).

The official release of Celery 5.3 enables the configuration of custom worker pool classes thusly:

  • Set the environment variable CELERY_CUSTOM_WORKER_POOL to the name of your desired worker pool implementation implementation.

    • NOTE: The value of the environment variable must be formatted in the standard Python/Celery format of package:class
    • % export CELERY_CUSTOM_WORKER_POOL='celery_aio_pool.pool:AsyncIOPool'
  • Tell Celery to use your desired pool by specifying --pool=custom when running your worker instance(s)

    • % celery worker --pool=custom --loglevel=INFO --logfile="$(pwd)/worker.log"

To verify the pool implementation, examine the output of the celery inspect stats command:

% celery --app=your_celery_project inspect stats
->  celery@freenas: OK
    {
        ...
        "pool": {
           ...
            "implementation": "celery_aio_pool.pool:AsyncIOPool",
    ...

Developing / Testing / Contributing

NOTE: Our preferred packaging and dependency manager is Poetry. Installation instructions can be found here.

Developing

Clone the repo and install the dependencies

$ git clone https://github.com/the-wondersmith/celery-aio-pool.git \
  && cd celery-aio-pool \
  && poetry install --sync

Optionally, if you do not have or prefer not to use Poetry, celery-aio-pool is fully PEP-517 compliant and can be installed directly by any PEP-517-compliant package manager.

$ cd celery-aio-pool \
  && pip install -e "$(pwd)"

TODO: Coming Soon™

Testing

To run the test suite:

$ poetry run pytest tests/

Contributing

TODO: Coming Soon™

celery-aio-pool's People

Contributors

shaheedhaque avatar spawn-guy avatar the-wondersmith avatar thijskramer avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

celery-aio-pool's Issues

Pypi out of date with repo

Python version was bumped to support 3.11 in the repo but this was never released to pypi.  Still causes errors when trying to install on a python 3.11 project.

Auto retry on exception feature of celery not working

  • Celery Executor version: 5.2.7
  • Python version: 3.11.2
  • Operating System: python:3.11-slim (debian)

Description

Trying to use the auto retry on exception feature of celery tasks. I've tried to get it to work every way listed in the celery docs and it always fails. Not sure if its something that can be fixed in the scope of this package or not, but it does seem to be cause by the ai0 pool as it works if I disable it.
Docs link

What I Did

class RetryTest(celery.Task):
    autoretry_for = ( RuntimeError,)
    max_retries = 5
    retry_backoff = True
    retry_backoff_max = 60
    retry_jitter = True


@celery_app.task(name="task_retrytest", bind=True, base=RetryTest)
def task_retrytest(self: RetryTest):
    raise RuntimeError("test")

Here is the trace from the worker, Sorry I don't have a better way to parse this out from the docker logs.

celeryworker  | [2023-04-05 13:17:05,970: INFO/SpawnProcess-148] Task task_retrytest[6a08dbdc-76b2-42d9-8b6f-847183ff0936] received
celeryworker  | [2023-04-05 13:17:05,988: WARNING/ForkPoolWorker-6] --- Logging error ---
celeryworker  | [2023-04-05 13:17:05,990: WARNING/ForkPoolWorker-6] Traceback (most recent call last):
celeryworker  | [2023-04-05 13:17:05,991: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery_aio_pool/tracer.py", line 237, in trace_task
celeryworker  |     R = retval = AsyncIOPool.run_in_pool(fun, *args, **kwargs)
celeryworker  |                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:05,991: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery_aio_pool/pool.py", line 222, in run_in_pool
celeryworker  |     return worker_pool.run(
celeryworker  |            ^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:05,991: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery_aio_pool/pool.py", line 203, in run
celeryworker  |     raise error
celeryworker  | [2023-04-05 13:17:05,991: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/asyncio/threads.py", line 25, in to_thread
celeryworker  |     return await loop.run_in_executor(None, func_call)
celeryworker  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:05,992: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
celeryworker  |     result = self.fn(*self.args, **self.kwargs)
celeryworker  |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:05,992: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/app/task.py", line 392, in __call__
celeryworker  |     return self.run(*args, **kwargs)
celeryworker  |            ^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:05,993: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/app/autoretry.py", line 54, in run
celeryworker  |     ret = task.retry(exc=exc, **retry_kwargs)
celeryworker  |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:05,993: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/app/task.py", line 701, in retry
celeryworker  |     raise_with_context(exc or Retry('Task can be retried', None))
celeryworker  | [2023-04-05 13:17:05,993: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/app/autoretry.py", line 34, in run
celeryworker  |     return task._orig_run(*args, **kwargs)
celeryworker  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:05,993: WARNING/ForkPoolWorker-6]   File "/app/hawkeye/tasks/amazon/ad/report/advertisedproduct.py", line 60, in task_retrytest
celeryworker  |     raise ADAPIRetriableException("test")
celeryworker  | [2023-04-05 13:17:05,994: WARNING/ForkPoolWorker-6] adapi_pydantic.base.exceptions.ADAPIRetriableException: test
celeryworker  | [2023-04-05 13:17:05,996: WARNING/ForkPoolWorker-6] 
celeryworker  | During handling of the above exception, another exception occurred:
celeryworker  | [2023-04-05 13:17:05,996: WARNING/ForkPoolWorker-6] Traceback (most recent call last):
celeryworker  | [2023-04-05 13:17:05,999: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/logging/__init__.py", line 1110, in emit
celeryworker  |     msg = self.format(record)
celeryworker  |           ^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:05,999: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/logging/__init__.py", line 953, in format
celeryworker  |     return fmt.format(record)
celeryworker  |            ^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:06,000: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/utils/log.py", line 146, in format
celeryworker  |     msg = super().format(record)
celeryworker  |           ^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:06,000: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/logging/__init__.py", line 695, in format
celeryworker  |     record.exc_text = self.formatException(record.exc_info)
celeryworker  |                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:06,001: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/utils/log.py", line 142, in formatException
celeryworker  |     r = super().formatException(ei)
celeryworker  |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:06,001: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/logging/__init__.py", line 645, in formatException
celeryworker  |     traceback.print_exception(ei[0], ei[1], tb, None, sio)
celeryworker  | [2023-04-05 13:17:06,001: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/traceback.py", line 124, in print_exception
celeryworker  |     te = TracebackException(type(value), value, tb, limit=limit, compact=True)
celeryworker  |          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:06,001: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/traceback.py", line 690, in __init__
celeryworker  |     self.stack = StackSummary._extract_from_extended_frame_gen(
celeryworker  |                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:06,001: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/traceback.py", line 416, in _extract_from_extended_frame_gen
celeryworker  |     for f, (lineno, end_lineno, colno, end_colno) in frame_gen:
celeryworker  | [2023-04-05 13:17:06,002: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/traceback.py", line 353, in _walk_tb_with_full_positions
celeryworker  |     positions = _get_code_position(tb.tb_frame.f_code, tb.tb_lasti)
celeryworker  |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:06,002: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/traceback.py", line 366, in _get_code_position
celeryworker  |     positions_gen = code.co_positions()
celeryworker  |                     ^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:06,002: WARNING/ForkPoolWorker-6] AttributeError: '_Code' object has no attribute 'co_positions'
celeryworker  | [2023-04-05 13:17:06,002: WARNING/ForkPoolWorker-6] Call stack:
celeryworker  | [2023-04-05 13:17:06,009: WARNING/ForkPoolWorker-6]   File "<string>", line 1, in <module>
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/multiprocessing/spawn.py", line 120, in spawn_main
celeryworker  |     exitcode = _main(fd, parent_sentinel)
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/multiprocessing/spawn.py", line 133, in _main
celeryworker  |     return self._bootstrap(parent_sentinel)
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
celeryworker  |     self.run()
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/multiprocessing/process.py", line 108, in run
celeryworker  |     self._target(*self._args, **self._kwargs)
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/watchgod/cli.py", line 51, in run_function
celeryworker  |     func()
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/__main__.py", line 15, in main
celeryworker  |     sys.exit(_main())
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/bin/celery.py", line 217, in main
celeryworker  |     return celery(auto_envvar_prefix="CELERY")
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/click/core.py", line 1130, in __call__
celeryworker  |     return self.main(*args, **kwargs)
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/click/core.py", line 1055, in main
celeryworker  |     rv = self.invoke(ctx)
celeryworker  | [2023-04-05 13:17:06,011: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/click/core.py", line 1657, in invoke
celeryworker  |     return _process_result(sub_ctx.command.invoke(sub_ctx))
celeryworker  | [2023-04-05 13:17:06,011: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/click/core.py", line 1404, in invoke
celeryworker  |     return ctx.invoke(self.callback, **ctx.params)
celeryworker  | [2023-04-05 13:17:06,011: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/click/core.py", line 760, in invoke
celeryworker  |     return __callback(*args, **kwargs)
celeryworker  | [2023-04-05 13:17:06,011: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/click/decorators.py", line 26, in new_func
celeryworker  |     return f(get_current_context(), *args, **kwargs)
celeryworker  | [2023-04-05 13:17:06,011: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/bin/base.py", line 134, in caller
celeryworker  |     return f(ctx, *args, **kwargs)
celeryworker  | [2023-04-05 13:17:06,011: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/bin/worker.py", line 351, in worker
celeryworker  |     worker.start()
celeryworker  | [2023-04-05 13:17:06,011: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/worker/worker.py", line 203, in start
celeryworker  |     self.blueprint.start(self)
celeryworker  | [2023-04-05 13:17:06,012: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
celeryworker  |     step.start(parent)
celeryworker  | [2023-04-05 13:17:06,012: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/bootsteps.py", line 365, in start
celeryworker  |     return self.obj.start()
celeryworker  | [2023-04-05 13:17:06,012: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/concurrency/base.py", line 129, in start
celeryworker  |     self.on_start()
celeryworker  | [2023-04-05 13:17:06,012: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/concurrency/prefork.py", line 109, in on_start
celeryworker  |     P = self._pool = Pool(processes=self.limit,
celeryworker  | [2023-04-05 13:17:06,012: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/concurrency/asynpool.py", line 463, in __init__
celeryworker  |     super().__init__(processes, *args, **kwargs)
celeryworker  | [2023-04-05 13:17:06,012: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/pool.py", line 1046, in __init__
celeryworker  |     self._create_worker_process(i)
celeryworker  | [2023-04-05 13:17:06,012: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/concurrency/asynpool.py", line 480, in _create_worker_process
celeryworker  |     return super()._create_worker_process(i)
celeryworker  | [2023-04-05 13:17:06,012: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/pool.py", line 1158, in _create_worker_process
celeryworker  |     w.start()
celeryworker  | [2023-04-05 13:17:06,013: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/process.py", line 124, in start
celeryworker  |     self._popen = self._Popen(self)
celeryworker  | [2023-04-05 13:17:06,013: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/context.py", line 333, in _Popen
celeryworker  |     return Popen(process_obj)
celeryworker  | [2023-04-05 13:17:06,013: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/popen_fork.py", line 24, in __init__
celeryworker  |     self._launch(process_obj)
celeryworker  | [2023-04-05 13:17:06,013: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/popen_fork.py", line 79, in _launch
celeryworker  |     code = process_obj._bootstrap()
celeryworker  | [2023-04-05 13:17:06,013: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/process.py", line 327, in _bootstrap
celeryworker  |     self.run()
celeryworker  | [2023-04-05 13:17:06,013: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/process.py", line 114, in run
celeryworker  |     self._target(*self._args, **self._kwargs)
celeryworker  | [2023-04-05 13:17:06,013: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/pool.py", line 292, in __call__
celeryworker  |     sys.exit(self.workloop(pid=pid))
celeryworker  | [2023-04-05 13:17:06,014: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/pool.py", line 362, in workloop
celeryworker  |     result = (True, prepare_result(fun(*args, **kwargs)))
celeryworker  | [2023-04-05 13:17:06,014: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/app/trace.py", line 649, in fast_trace_task
celeryworker  |     R, I, T, Rstr = tasks[task].__trace__(
celeryworker  | [2023-04-05 13:17:06,014: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery_aio_pool/tracer.py", line 254, in trace_task
celeryworker  |     I, R, state, retval = on_error(task_request, exc)
celeryworker  | [2023-04-05 13:17:06,014: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery_aio_pool/tracer.py", line 154, in on_error
celeryworker  |     R = I.handle_error_state(
celeryworker  | [2023-04-05 13:17:06,014: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/app/trace.py", line 178, in handle_error_state
celeryworker  |     return {
celeryworker  | [2023-04-05 13:17:06,014: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/app/trace.py", line 237, in handle_failure
celeryworker  |     self._log_error(task, req, einfo)
celeryworker  | [2023-04-05 13:17:06,014: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/app/trace.py", line 265, in _log_error
celeryworker  |     logger.log(policy.severity, policy.format.strip(), context,

Asyncio backends

Hi, first thanks a ton for this lib the-wondersmith and ShaheedHaque! I'm interested in putting this through my prod env using a Redis backend. I noticed the README uses the amqp protocol, but I don't see an rpc/rabbitmq backend. Is an asyncio backend implementation needed? Happy to contribute a redis.asyncio backend.

The task.request is still sometimes "empty"

Despite 3f7b833, I am still getting this "sometimes":

billiard.einfo.ExceptionWithTraceback:
"""
Traceback (most recent call last):
  File "/main/srhaque/bootstrap/source/paiyroll/tasks.py", line 109, in viewflow_dequeue
    return CeleryResultCodec.encode_success(logger, celery_task, body(call_, viewflow_process, viewflow_task, celery_task))
  File "/main/srhaque/bootstrap/source/paiyroll/tasks.py", line 96, in body
    assert cl_task.request.id == vf_task.external_task_id, \
AssertionError: None != external task 09db61b7-f45b-4f1d-8752-6acadad2076e

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/celery_aio_pool/tracer.py", line 283, in trace_task
    R = retval = AsyncIOPool.run_in_pool(fun, *args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/celery_aio_pool/pool.py", line 222, in run_in_pool
    return worker_pool.run(
  File "/usr/local/lib/python3.10/dist-packages/celery_aio_pool/pool.py", line 203, in run
    raise error
  File "/usr/lib/python3.10/asyncio/threads.py", line 25, in to_thread
    return await loop.run_in_executor(None, func_call)
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.10/dist-packages/celery/app/trace.py", line 732, in __protected_call__
    return orig(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/celery/app/task.py", line 405, in __call__
    return self.run(*args, **kwargs)
  File "/main/srhaque/bootstrap/source/paiyroll/tasks.py", line 112, in viewflow_dequeue
    return CeleryResultCodec.encode_exception(logger, celery_task, e)
  File "/main/srhaque/bootstrap/source/paiyroll/xprocess.py", line 336, in encode_exception
    return cls._encode(logger, celery_states.FAILURE, celery_task, exception)
  File "/main/srhaque/bootstrap/source/paiyroll/xprocess.py", line 345, in _encode
    hostname = request.hostname.split('@', 1)[-1]
AttributeError: 'NoneType' object has no attribute 'split'
"""

As we discussed at one point, it seems there are multiple paths into the user's logic and at least one of these paths is clearing or recreating the task.request.

How send tasks in async

Hello!

Pool works perfect.

But I some stuck with solution how to send tasks.

Right now I just sending it in sync manner:

celery_app.send_task("sso.tasks.parse_whitelist", kwargs={"company_id": company.id}

Any sync/async regular ways doesn't works:

# doesn't work
parse_whitelist.apply_async(company_id=company.id)
# I understand that it shouldn't work, but I still tried )))
await parse_whitelist.apply_async(company_id=company.id)

May you advice please?

PS. I've using [email protected]

support for 3.11

in pyproject.toml

[tool.poetry.dependencies]

Supported Python versions

python = ">=3.8,<3.11"

Signals support

Hello, is there a way to make async Celery Signals? Do you have an Idea of a workaround?
what I want to achieve is the async initialization of some resources after the worker pool gets created like this:

from celery import signals

@signals.worker_init.connect
async def worker_init_dispatch(*args, **kwargs):
    await print('Do some worker initialization')

Long running tasks blocking Kombu heartbeats

Hello, @the-wondersmith !

I found some issue in your pool (and in celery-pool-asyncio it is same):

Long running tasks blocked pool (?) and Kombu don't sending AMQP heartbeats till task completed.

Regular heartbeats looks like:

[2023-04-20 00:23:00,575: DEBUG/MainProcess] heartbeat_tick : for connection 8a76d4c9f12f42a08bf401ce434eea51
[2023-04-20 00:23:00,576: DEBUG/MainProcess] heartbeat_tick : Prev sent/recv: None/None, now - 28/58, monotonic - 1202601.45058425, last_heartbeat_sent - 1202601.450567083, heartbeat int. - 60 for connection 8a76d4c9f12f42a08bf401ce434eea51
[2023-04-20 00:23:20,578: DEBUG/MainProcess] heartbeat_tick : for connection 8a76d4c9f12f42a08bf401ce434eea51
[2023-04-20 00:23:20,579: DEBUG/MainProcess] heartbeat_tick : Prev sent/recv: 28/58, now - 28/89, monotonic - 1202621.45346225, last_heartbeat_sent - 1202601.450567083, heartbeat int. - 60 for connection 8a76d4c9f12f42a08bf401ce434eea51
[2023-04-20 00:23:40,581: DEBUG/MainProcess] heartbeat_tick : for connection 8a76d4c9f12f42a08bf401ce434eea51
[2023-04-20 00:23:40,584: DEBUG/MainProcess] heartbeat_tick : Prev sent/recv: 28/89, now - 28/119, monotonic - 1202641.459304791, last_heartbeat_sent - 1202601.450567083, heartbeat int. - 60 for connection 8a76d4c9f12f42a08bf401ce434eea51

and it sent each 20-30s with default settings (60s timeout in RabbitMQ).

So after long task (>1min) Celery trying ACK task, fail with broken pipe and closed connection, restart connection, retrieve same task again and encircle to infinity loop )))

[2023-04-20 00:29:05,760: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...

If you want reproduce it, just run

@app.task()
async def long_task() -> None:
    import asyncio
    await asyncio.sleep(300)

PS. It block Basic.Ack also, so Celery's feature by default acks_late=False becomes True )))
Worker send Ack for task message only after task execution.
Especially it annoying together with long tasks, because not acked task return to queue and after connection lost worker start it again and again.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.