Comments (5)
I loosely recollect having to do this in the past... wouldn't defining a job_id
for the cron(..) enforce uniqueness at run time? and to re-enqueue it immediately after it finishes, I believe the repeated_async_job(..) wrapper could enqueue it with the same job_id before it returns?
from arq.
I'd prefer not to mix cron
with func
although the cron are converted to func. Below example is func
-only.
Regarding your job_id
suggestion: The first job can/should have a job_id
, but all subsequent jobs cannot have the same job_id
because it cannot re-enqueue a job within the job because it is not yet finished.
I tried to workaround this by querying the amount of queued jobs of this task, if it's exactly 1 (the current job) then re-enqueue the next:
import asyncio
from arq import create_pool
from arq.connections import RedisSettings
async def repeated_async_job(ctx):
# do stuff
asyncio.sleep(3)
# re-enqueue if there is exactly one job running (this job)
redis = ctx['redis']
queued_jobs = await redis.queued_jobs()
queued_jobs_len = len([job for job in queued_jobs if job.function == 'repeated_async_job'])
if queued_jobs_len == 0:
print("ERROR: should not happen")
elif queued_jobs == 1:
# the current job so we can enqueue the next, but without a job_id
await redis.enqueue_job('repeated_async_job', _job_try=1)
else:
print("ERROR: too many jobs")
async def main():
redis = await create_pool(RedisSettings())
# startup job with unique ID
await redis.enqueue_job('repeated_async_job', _job_id='app.main.startup', _job_try=1)
class WorkerSettings:
functions = [repeated_async_job]
if __name__ == '__main__':
asyncio.run(main())
it seems to work, but not sure if there is a better/native way to do this.
I was also wondering what _job_try
really does, in the docs it says:
enqueue_job
_job_try
– useful when re-enqueueing jobs within a job
but how is it useful?
from arq.
Ah, now I remember that's where I got stuck, how do you re-enqueue a job_id when one is already running with the same id? (or if its result is saved, but not explicitly retrieved/deleted).
Regarding _job_try: My understanding is that _job_try
is to explicitly set which 'retry attempt' number do you want to treat the enqueued job as, which should be accessible in the ctx['job_try'] in the job, and used by arq wherever job_try is referenced: https://github.com/search?q=repo%3Asamuelcolvin%2Farq+job_try&type=code - I do not think it helps in anyway with the _job_id situation here, to the best of my knowledge.
If you find a better solution, keen to learn too!
from arq.
since a job can have more states than queued
I'm using this check now before enqueuing:
import asyncio
from arq import create_pool
from arq.connections import RedisSettings
async def repeated_async_job(ctx):
# do stuff
asyncio.sleep(3)
# Check if any job with the same function is deferred, queued, or in progress
pool = ctx['redis']
all_jobs = await pool.all_job_results()
in_progress_jobs = [
job for job in all_jobs
if job.status in {JobStatus.deferred, JobStatus.queued, JobStatus.in_progress}
and job.function == 'repeated_async_job'
]
if in_progress_jobs:
return 'done'
await pool.enqueue_job('repeated_async_job')
return 'done'
async def main():
redis = await create_pool(RedisSettings())
await redis.enqueue_job('repeated_async_job')
class WorkerSettings:
functions = [repeated_async_job]
if __name__ == '__main__':
asyncio.run(main())
it does not account for params (i.e. same job but different parameters), but for this job I don't need it.
from arq.
@davidhuser are you facing the issue I've filed here by any chance, or know how to solve it? #459 where there's an in-progress key created for 60 seconds, even for a cron which I want to run every 5 or 10 seconds?
from arq.
Related Issues (20)
- [BUG] --watch does not reload code changes, despite console output saying so "files changed, reloading arq worker..." HOT 4
- How can I prevent cron from starting a new task if the previous one is still running? HOT 2
- cron job with parameter HOT 2
- Cannot enqueue job with a _job_id again, despite execution of the job being fully completed? HOT 5
- How do I handle job time out errors within the task?
- Arq - Worker stops when Redis connection is interrupted HOT 3
- Allow for user specified Redis key prefix HOT 3
- Future plan for Arq HOT 33
- `v0.26` release HOT 3
- [Question] Concurrent Functions
- crash if expires_ms is negative
- Grab Job in Queue and Cancel it HOT 1
- Dynamic Cron Jobs HOT 1
- Time based Queue. Handling X jobs per minute. HOT 1
- [Redis version] Library does not support latest redis version
- Poll for next batch of job when about to finish
- TypeError: Redis.__init__() got an unexpected keyword argument 'retry_on_error' HOT 5
- Support for redis > 5.0.x in pyproject.toml?
- Unable to run a cron every 5 seconds, when job_id is set? (does the minimum have to be 60 seconds?)
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from arq.