Git Product home page Git Product logo

Comments (5)

epicwhale avatar epicwhale commented on June 2, 2024

I loosely recollect having to do this in the past... wouldn't defining a job_id for the cron(..) enforce uniqueness at run time? and to re-enqueue it immediately after it finishes, I believe the repeated_async_job(..) wrapper could enqueue it with the same job_id before it returns?

from arq.

davidhuser avatar davidhuser commented on June 2, 2024

I'd prefer not to mix cron with func although the cron are converted to func. Below example is func-only.

Regarding your job_id suggestion: The first job can/should have a job_id, but all subsequent jobs cannot have the same job_id because it cannot re-enqueue a job within the job because it is not yet finished.

I tried to workaround this by querying the amount of queued jobs of this task, if it's exactly 1 (the current job) then re-enqueue the next:

import asyncio

from arq import create_pool
from arq.connections import RedisSettings

async def repeated_async_job(ctx):
    # do stuff
    asyncio.sleep(3)
    
    # re-enqueue if there is exactly one job running (this job)
    redis = ctx['redis']
    queued_jobs = await redis.queued_jobs()
    queued_jobs_len = len([job for job in queued_jobs if job.function == 'repeated_async_job'])
    if queued_jobs_len == 0:
        print("ERROR: should not happen")
    elif queued_jobs == 1:
        # the current job so we can enqueue the next, but without a job_id
        await redis.enqueue_job('repeated_async_job', _job_try=1)
    else:
        print("ERROR: too many jobs")


async def main():
    redis = await create_pool(RedisSettings())
    # startup job with unique ID
    await redis.enqueue_job('repeated_async_job', _job_id='app.main.startup', _job_try=1)


class WorkerSettings:
    functions = [repeated_async_job]

if __name__ == '__main__':
    asyncio.run(main())
    

it seems to work, but not sure if there is a better/native way to do this.

I was also wondering what _job_try really does, in the docs it says:

enqueue_job
_job_try – useful when re-enqueueing jobs within a job

but how is it useful?

from arq.

epicwhale avatar epicwhale commented on June 2, 2024

Ah, now I remember that's where I got stuck, how do you re-enqueue a job_id when one is already running with the same id? (or if its result is saved, but not explicitly retrieved/deleted).

Regarding _job_try: My understanding is that _job_try is to explicitly set which 'retry attempt' number do you want to treat the enqueued job as, which should be accessible in the ctx['job_try'] in the job, and used by arq wherever job_try is referenced: https://github.com/search?q=repo%3Asamuelcolvin%2Farq+job_try&type=code - I do not think it helps in anyway with the _job_id situation here, to the best of my knowledge.

If you find a better solution, keen to learn too!

from arq.

davidhuser avatar davidhuser commented on June 2, 2024

since a job can have more states than queued I'm using this check now before enqueuing:

import asyncio

from arq import create_pool
from arq.connections import RedisSettings

async def repeated_async_job(ctx):
    # do stuff
    asyncio.sleep(3)

    # Check if any job with the same function is deferred, queued, or in progress
    pool = ctx['redis']
    all_jobs = await pool.all_job_results()
    in_progress_jobs = [
        job for job in all_jobs
        if job.status in {JobStatus.deferred, JobStatus.queued, JobStatus.in_progress}
        and job.function == 'repeated_async_job'
    ]

    if in_progress_jobs:
        return 'done'

    await pool.enqueue_job('repeated_async_job')
    return 'done'


async def main():
    redis = await create_pool(RedisSettings())
    await redis.enqueue_job('repeated_async_job')


class WorkerSettings:
    functions = [repeated_async_job]

if __name__ == '__main__':
    asyncio.run(main())

it does not account for params (i.e. same job but different parameters), but for this job I don't need it.

from arq.

epicwhale avatar epicwhale commented on June 2, 2024

@davidhuser are you facing the issue I've filed here by any chance, or know how to solve it? #459 where there's an in-progress key created for 60 seconds, even for a cron which I want to run every 5 or 10 seconds?

from arq.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.