Git Product home page Git Product logo

arq's Introduction

arq

CI Coverage downloads pypi versions license

Job queues in python with asyncio and redis.

See documentation for more details.

arq's People

Contributors

angellusmortis avatar ccharlesgb avatar cmangla avatar dependabot[bot] avatar dmvass avatar dnikolayev avatar elatt avatar erakli avatar facundojmaero avatar iamlikeme avatar isobelhooper avatar ivlevdenis avatar jerzy-kurowski avatar johtso avatar jonasks avatar kdorsel avatar kludex avatar laztoum avatar mariatta avatar phy1729 avatar ponytailer avatar pratheekrebala avatar pyup-bot avatar rubik avatar samuelcolvin avatar sgranade avatar stegben avatar tinche avatar tobymao avatar vanyakosmos avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

arq's Issues

`arq --check` command result

Version:v0.16b3

I think performing arq -- check demo. WorkerSettings is the wrong result.
For example:
Results of health checks performed
image
The result of a forced termination of execution
image

The number of job job states is inconsistent.
Do you think?

Few question from beginners.

Hello.
First, thanks for the great work.
But I can found answers for my question in doc.
So, may be some one can help me.

  1. If I use RedisPool for my own data access in startup parameter like
async def startup(ctx):
    qredis = await arq_create_pool(settings=RedisSettings(host='localhost', port=6379, database=1))
    ctx['redis_cache'] = redis_cache

And use it in my function later

class WorkerSettings:
    functions = [get_messages]
    on_startup = startup
    on_shutdown = shutdown

async def get_messages(ctx):
    redis_cache = ctx['redis_cache']
    print(f"LAST_MAIL_ID: {await redis_cache.get('last_id')}")

Does I am need close this poll in on_shutdown params?

  1. How I can run workers from python file? Not from system terminal like
    # arq my_file.WorkeRname

  2. How I can work with output log? Disable it or retranslate it in to file?

Thanks

SIGRTMIN is not supported in Mac OS X and FreeBSD

y@wuza $ ./venv/bin/arq
Traceback (most recent call last):
  File "./venv/bin/arq", line 7, in <module>
    from arq.cli import cli
  File "/…/venv/lib/python3.5/site-packages/arq/__init__.py", line 6, in <module>
    from .worker import *  # type: ignore
  File "/…/venv/lib/python3.5/site-packages/arq/worker.py", line 39, in <module>
    SIG_SUPERVISOR = signal.SIGRTMIN + 7
AttributeError: module 'signal' has no attribute 'SIGRTMIN'

worker.check() method

Checks for any failed jobs and raises an exception from one of those failed jobs, otherwise returns the number of successful jobs.

useful when testing, could also all async_run.

give jobs an ID

Useful for:

  • results (if they're ever required)
  • logging, to work out when a job finishes

allow redis timeout

When first initialising a redis server it sometimes causes connections to hang indefinitely.

Solution is to connect to redis on startup and print the version etc, then timeout and retry until a good connection is established.

Examples how to integrate arq

Hi @samuelcolvin, thanks for arq!

I'd like to integrate arq with aiohttp web application to run some long jobs in the background.

Do you have some examples for this?

If I understood right I can create a remote worker specifying the redis server and run it.
Then inside the app I can create an Actor using the same redis server that sends jobs to the worker.

Is it correct?

Starting multiple workers

I am reading your docs but I can’t find the CLI options. I would like to start multiple workers like I can in RQ. Is this possible. If so, how?

Thanks!

StopJob exception

Would be useful.

Just ceases the current job without the error logger or printed traceback.

Status of job execution

Hi, in arq is it possible to get the status of a job that has been submitted ? Currently for version 0.14 awaiting a job returns None. Is there a provision to get the job id and query the status.

Some tests fail with RuntimeError: There is no current event loop in thread 'MainThread'

test_no_jobs and test_health_check_direct fail with this same error:

 pytest -k test_health_check_direct
Test session starts (platform: linux, Python 3.7.4, pytest 4.4.1, pytest-sugar 0.9.2)
rootdir: /home/miki/exp/arq, inifile: setup.cfg, testpaths: tests
plugins: mock-1.10.4, sugar-0.9.2, timeout-1.3.3, cov-2.6.1, aiohttp-0.3.0, toolbox-0.4
timeout: 5.0s
timeout method: signal
timeout func_only: False
collecting ... 

―――――――――――――――――――――――――――――――――――― test_health_check_direct ――――――――――――――――――――――――――――――――――――

loop = <_UnixSelectorEventLoop running=False closed=False debug=False>

    def test_health_check_direct(loop):
        class Settings:
            pass
    
>       assert check_health(Settings) == 1

tests/test_worker.py:41: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
arq/worker.py:610: in check_health
    loop = asyncio.get_event_loop()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asyncio.unix_events._UnixDefaultEventLoopPolicy object at 0x7f080bc70210>

    def get_event_loop(self):
        """Get the event loop.
    
        This may be None or an instance of EventLoop.
        """
        if (self._local._loop is None and
                not self._local._set_called and
                isinstance(threading.current_thread(), threading._MainThread)):
            self.set_event_loop(self.new_event_loop())
    
        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
>                              % threading.current_thread().name)
E           RuntimeError: There is no current event loop in thread 'MainThread'.

/usr/lib64/python3.7/asyncio/events.py:644: RuntimeError

 tests/test_worker.py ⨯                                                            100% ██████████

Results (0.31s):
       1 failed
         - tests/test_worker.py:37 test_health_check_direct
      97 deselected

Termination with uvloop

Log from compose down with arq:

 morpheus_worker_1|INFO arq.work got signal: SIGTERM, waiting for worker pid=24 to finish...
 morpheus_worker_1|INFO arq.work pid=24, got signal proxied from main process, stopping...
 morpheus_worker_1|Exception in callback Loop._read_from_self
 morpheus_worker_1|handle: <Handle Loop._read_from_self>
 morpheus_worker_1|Traceback (most recent call last):
 morpheus_worker_1|  File "uvloop/cbhandles.pyx", line 52, in uvloop.loop.Handle._run (uvloop/loop.c:48414)
 morpheus_worker_1|  File "uvloop/loop.pyx", line 242, in uvloop.loop.Loop._read_from_self (uvloop/loop.c:9481)
 morpheus_worker_1|  File "uvloop/loop.pyx", line 253, in uvloop.loop.Loop._process_self_data (uvloop/loop.c:9726)
 morpheus_worker_1|  File "uvloop/loop.pyx", line 267, in uvloop.loop.Loop._handle_signal (uvloop/loop.c:9912)
 morpheus_worker_1|  File "/usr/local/lib/python3.6/site-packages/arq/worker.py", line 356, in handle_proxy_signal
 morpheus_worker_1|    raise HandledExit()
 morpheus_worker_1|arq.worker.HandledExit
 morpheus_worker_1|WARNING arq.work pid=24, got signal: SIGALRM again, forcing exit
 morpheus_worker_1|Exception in callback Loop._read_from_self
 morpheus_worker_1|handle: <Handle Loop._read_from_self>
 morpheus_worker_1|Traceback (most recent call last):
 morpheus_worker_1|  File "uvloop/cbhandles.pyx", line 52, in uvloop.loop.Handle._run (uvloop/loop.c:48414)
 morpheus_worker_1|  File "uvloop/loop.pyx", line 242, in uvloop.loop.Loop._read_from_self (uvloop/loop.c:9481)
 morpheus_worker_1|  File "uvloop/loop.pyx", line 253, in uvloop.loop.Loop._process_self_data (uvloop/loop.c:9726)
 morpheus_worker_1|  File "uvloop/loop.pyx", line 267, in uvloop.loop.Loop._handle_signal (uvloop/loop.c:9912)
 morpheus_worker_1|  File "/usr/local/lib/python3.6/site-packages/arq/worker.py", line 370, in handle_sig_force
 morpheus_worker_1|    raise ImmediateExit('force exit')
 morpheus_worker_1|arq.worker.ImmediateExit: force exit

Retrieving new jobs from the queue with a limit

By checking the source code it seems to me that a worker always takes as many jobs as possible from the queue, independently of how many free actors are there:

arq/arq/worker.py

Lines 260 to 264 in b2d397a

async for _ in poll(self.poll_delay_s): # noqa F841
async with self.sem: # don't bother with zrangebyscore until we have "space" to run the jobs
now = timestamp_ms()
job_ids = await self.pool.zrangebyscore(self.queue_name, max=now)
await self.run_jobs(job_ids)

I think this has great potential for improvement. If the queue is large, a worker will keep all the job IDs in memory without being able to execute all of those in a meaningful time. On the contrary, if the semaphore indicates that it's full, why take jobs? (Or maybe we should take just a few.) I was also planning to run multiple instances of my worker, and in that case the parallelism would benefit from each worker taking a subset of jobs that they can manage.

With that in mind I propose that the above call to Redis is changed to:

job_ids = await self.pool.zrangebyscore(self.queue_name, limit=limit)

where limit is the number of free spots in the semaphore or some multiple of that (ideally configurable, I think the optimal value would mostly depend on the use case but it's probably slightly above 1 in most of them).

I also don't quite understand the usefulness of now. I think that in the great majority of the cases, all the enqueued jobs have a timestamp below the current timestamp. But even if they had a slightly higher timestamp, why would a free worker not run it?

Configuration error

Hello.
I try to select another RedisDB for create_pool
This is the simple code

  redis = await arq_create_pool(settings=RedisSettings(host='localhost', port=6379, database=3))
   job = await redis.enqueue_job('get_messages')
   print(job.job_id)
   print(await job.info())
   print(await job.status())
   print(await job.result())

When I use default settings, like a
redis = await arq_create_pool(RedisSettings())
all working fine.
For example

bf04177b3c7443778a6b0786b8b435ac
{'enqueue_time': datetime.datetime(2019, 4, 1, 12, 34, 15, 939000), 'job_try': None, 'function': 'get_messages', 'args': (), 'kwargs': {}, 'score': 1554122055939}
JobStatus.queued
[My data is here]

But when I try to use another database
redis = await arq_create_pool(settings=RedisSettings(host='localhost', port=6379, database=3))
script is stuck on print(await job.result())
In arq log I doesn't see any of actions actions
In redis database3 I see only two keys
arq:queue = score :1554122478867, membe: ef48cff3cd0e46ec86b9c2ae8642491a
arq:job:ef48cff3cd0e46ec86b9c2ae8642491a = {}

Where is my mistake?

Can’t pickle aysncio.Future objects

After some playing around, I’ve finally figured things out and am able to run some workers but now I am getting this error:

00:01:18: error serializing result of 353fa5bd0bcf4556982727732b82a34f:add_async_task even after replacing result
Traceback (most recent call last):
  File "/Users/sml/.virtualenvs/cr-api-web/lib/python3.7/site-packages/arq/jobs.py", line 184, in serialize_result
    return serializer(data)
TypeError: can't pickle _asyncio.Future objects
CRITICAL:arq.jobs:error serializing result of 353fa5bd0bcf4556982727732b82a34f:add_async_task even after replacing result
Traceback (most recent call last):
  File "/Users/sml/.virtualenvs/cr-api-web/lib/python3.7/site-packages/arq/jobs.py", line 184, in serialize_result
    return serializer(data)
TypeError: can't pickle _asyncio.Future objects

Here’s my source containing the worker:

import importlib

from arq import create_pool
from arq.connections import RedisSettings

from cr.config import Config


async def add_async_task(ctx, method_name=None, args=None, kwargs=None, use_es=False):
    module_name, func_name = method_name.rsplit('.', 1)
    module = importlib.import_module(module_name)
    method = getattr(module, func_name)
    if use_es:
        kwargs['es'] = ctx['es']

    await method(*args, **kwargs)


async def startup(ctx):
    ctx['es'] = Config().es


async def shutdown(ctx):
    await ctx['es'].close()


async def setup(sanic_app):
    sanic_app.ARQ_REDIS = await create_pool(
        RedisSettings(
            host=Config().config.redis.host,
            port=Config().config.redis.port,
            password=Config().config.redis.password,
        )
    )


async def get_arq_redis(request):
    return request.app.ARQ_REDIS


class WorkerSettings:
    functions = [add_async_task]
    on_startup = startup
    on_shutdown = shutdown
    redis_settings = RedisSettings(
        host=Config().config.redis.host,
        port=Config().config.redis.port,
        password=Config().config.redis.password,
    )

Basically, I am trying to dynamically run any coroutine that it can see within the module by passing a fully qualified module and function name together with its arguments. Is this at all possible?

auto restarting jobs

drains are a start, but they're confusing to use.

Better to redesign to automatically restart jobs.

Support for redis sentinel

Is supporting something like Redis Sentinel within the scope of this project? I think it could be added with a few changes to connections.py.

If it is within scope, I can send in a PR.

Resubmission of job with same id causes conflict

Suppose I add a job to a queue and later I add another job with the same job_id then even before the completion of job it returns JobStatus.complete.

example:

import asyncio
from aiohttp import ClientSession
from arq import create_pool
from arq.connections import RedisSettings

async def download_content(ctx, url):
    session: ClientSession = ctx['session']
    async with session.get(url) as response:
        content = await response.text()
        print(f'{url}: {content:.80}...')
    return len(content)

async def startup(ctx):
    ctx['session'] = ClientSession()

async def shutdown(ctx):
    await ctx['session'].close()


async def main():
    redis = await create_pool(RedisSettings())
    job_list = []
    for url in ('https://facebook.com', 'https://microsoft.com', 'https://github.com'):
        job = await redis.enqueue_job('download_content', url, _job_id=url)
        job_list.append(job)

    for j_ in job_list:
        print( j_.job_id)
        print(await j_.status())
# WorkerSettings defines the settings to use when creating the work,
# it's used by the arq cli
class WorkerSettings:
    functions = [download_content]
    on_startup = startup
    on_shutdown = shutdown

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Running this twice:

(sanic) ╭─adam@adams-MacBook-Air ~/sidekik 
╰─$ python arqq_.py
https://facebook.com
JobStatus.queued
https://microsoft.com
JobStatus.queued
https://github.com
JobStatus.queued
(sanic) ╭─adam@adams-MacBook-Air ~/sidekik 
╰─$ python arqq_.py
https://facebook.com
JobStatus.complete
https://microsoft.com
JobStatus.complete
https://github.com
JobStatus.complete

Multiple Queues (v.0.16)

How I'm supposed to create multiple queues with arq v0.16?

Looking at old docs I see that you could specify queue name, but now you can't.
What was the purpose of deleting queues from API?

I tried to start workers with different functions, but if worker doesn't see registered function it just deletes job.

So, if I want to use multiple queues on the same redis instance I have to use multiple redis databases (that have only numerical names)?

CancelledError allowing drain to drain

tasks should not take 20seconds to finish, I suspect something has gone wrong.

Also loads of 20 pending tasks, waiting for one to finish, should be a way to suppress them

17:35:12 WorkProcess: drain waiting 20.0s for 19 tasks to finish
17:35:32 WorkProcess: dft  ran in 43.929s ← Sender.send ● 
Exception in callback Drain._job_callback(<Task cancell...orker.py:154>>)
handle: <Handle Drain._job_callback(<Task cancell...orker.py:154>>)>
Traceback (most recent call last):
  File "/usr/lib/python3.6/asyncio/events.py", line 127, in _run
    self._callback(*self._args)
  File "/home/samuel/code/arq/arq/drain.py", line 154, in _job_callback
    task_exception = task.exception()
concurrent.futures._base.CancelledError
Exception in callback Drain._job_callback(<Task cancell...orker.py:154>>)
handle: <Handle Drain._job_callback(<Task cancell...orker.py:154>>)>
Traceback (most recent call last):
  File "/usr/lib/python3.6/asyncio/events.py", line 127, in _run
    self._callback(*self._args)
  File "/home/samuel/code/arq/arq/drain.py", line 154, in _job_callback
    task_exception = task.exception()
concurrent.futures._base.CancelledError
Exception in callback Drain._job_callback(<Task cancell...orker.py:154>>)
handle: <Handle Drain._job_callback(<Task cancell...orker.py:154>>)>
Traceback (most recent call last):
  File "/usr/lib/python3.6/asyncio/events.py", line 127, in _run
    self._callback(*self._args)
  File "/home/samuel/code/arq/arq/drain.py", line 154, in _job_callback
    task_exception = task.exception()
concurrent.futures._base.CancelledError

Guidance for getting eoranged/rq-dashboard type functionality

I've built a fairly substantial flask application which uses arq for all of its background processing. I'm pretty happy with the solution but see one deficiency that I'd like to try to address. I'd love to have some clean way to integrate visibility/monitoring of the queues that my application is leveraging.

Since rq-dashboard was designed to monitor http://python-rq.org/ queues and since you have been an significant contributor to RQ, I hoped you might offer some pointers for how I could integrate such a web front-end to monitor to arq queues, jobs, and workers. I'm looking to empower my application's users with greater insights into what is happening behind the scenes.

I'm looking to know whether forking rq-dashboard and reworking it might be feasible for me to pursue, and I'm hoping for a little guidance.

Separately, I suppose I'm also pushing for arq to offer console based "rq info" type capabilities similar to what is described at http://python-rq.org/docs/monitoring/

class based functions

__init__ takes context, then has run or call.

Also allows custom logging and name.

question

I have a question, but I don't know what happened to it.
I run a timed arq task in production every two hours for the first minute,then I changed it to every night at 12:05,but it still runs every two hours.
May I ask if there is a command to check the execution time of timed tasks?

local data

Shadows should have some way to access "local" data eg. for objects shared between many jobs where you don't want extra redis calls in every job.

WatchVariableError

I'm getting a aioredis.errors.WatchVariableError: ('WatchVariableError errors:', 'WATCH variable has changed') when I'm running my queue.

I noticed that you had already raised a similar issue at: aio-libs-abandoned/aioredis-py#558

Are you still seeing the same issue using arq? Do you by chance have a workaround for this?

I have a simple queue setup and the only thing I'm doing differently is using aioredis.create_sentinel_pool() to generate the redis connection pool before feeding it to the redis_pool parameter on the worker.

    r = RedisConfig
    sentinel_client_pool = aioredis.sentinel.create_sentinel_pool(
        [(r.host, r.port)],
        db=r.database,
        password=r.password,
        timeout=r.timeout,
        encoding='utf8'
    )

    sentinel_client_pool = await sentinel_client_pool

    redis_master = sentinel_client_pool.master_for('mymaster')
    redis_pool = ArqRedis(redis_master)

sync tasks in arq?

Hi,
thanks for your work.
I am currently evaluating switching to async web framework and one big piece of my existing code is celery tasks. The problem is that not all of them can be converted to async because of the absence of underlying transport libraries.
So the question is if it is possible to run sync tasks in a thread with arq?

Is it possible to be alive after redis restart?

20:16:07 WorkProcess: dft  ran in  1.169s ← Downloader.download_content ● 88346
20:16:34 WorkProcess: shutting down worker, waiting for 6 jobs to finish
20:16:34 WorkProcess: shutting down worker after 460.835s ◆ 6 jobs done ◆ 0 failed ◆ 0 timed out
20:16:34 WorkProcess: Worker exiting after an unhandled error: CancelledError
Traceback (most recent call last):
  File "/…/venv/lib/python3.5/site-packages/arq/worker.py", line 353, in start_worker
    worker.run_until_complete()
  File "/…/venv/lib/python3.5/site-packages/arq/worker.py", line 130, in run_until_complete
    self.loop.run_until_complete(self.run())
  File "/…/.pyenv/versions/3.5.2/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/…/.pyenv/versions/3.5.2/lib/python3.5/asyncio/futures.py", line 266, in result
    raise CancelledError
concurrent.futures._base.CancelledError
20:16:34 MainProcess: worker process 17776 exited badly with exit code 1

scheduled jobs

Run in a similar way to the health check.

Separate class or associated with an actor?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.