Comments (5)
The 0.25s delay is only applied when the worker is either disconnecting or at the concurrency limit.
If the worker has capacity to be running jobs then the delay is not used and new jobs will be processed near instantly.
from faktory_worker_python.
I'm running the example producer and worker code with some small tweaks to queue the jobs faster and add additional logging:
Producer
Worker
The Producer code queues ~6 jobs a second. However the Worker cannot keep up with that with a Concurrency of one.
Here are the log outputs of the worker:
DEBUG:faktory.worker:Running task: add(1, 2)
2024-05-21 15:16:10.826007 - add: 1 + 2 = 3
DEBUG:faktory.worker:Running task: subtract(10, 5)
2024-05-21 15:16:11.082362 subtract: 10 - 5 = 5
DEBUG:faktory.worker:Running task: multiply(8, 8)
2024-05-21 15:16:11.338471 multiply: 8 * 8 = 64
DEBUG:faktory.worker:Running task: add(1, 2)
2024-05-21 15:16:11.595590 - add: 1 + 2 = 3
DEBUG:faktory.worker:Running task: subtract(10, 5)
2024-05-21 15:16:11.852505 subtract: 10 - 5 = 5
DEBUG:faktory.worker:Running task: multiply(8, 8)
2024-05-21 15:16:12.109596 multiply: 8 * 8 = 64
DEBUG:faktory.worker:Running task: add(1, 2)
2024-05-21 15:16:12.361325 - add: 1 + 2 = 3
DEBUG:faktory.worker:Running task: subtract(10, 5)
2024-05-21 15:16:12.618927 subtract: 10 - 5 = 5
DEBUG:faktory.worker:Running task: multiply(8, 8)
2024-05-21 15:16:12.871962 multiply: 8 * 8 = 64
Each job is roughly taking ~0.25s to process, even though there enqueued jobs in Faktory is increasing in size.
from faktory_worker_python.
Increase your concurrency and you will get a better throughput.
The 0.25s delay is there to stop the worker process from spinning the CPU, allowing the in flight jobs to complete, when you are at the concurrency limit. When your workers are scaled appropriately you won't hit that delay at all and everything will be near instant.
from faktory_worker_python.
I'm looking to run this for jobs that are heavy CPU intensive with very minimal IO (machine learning), so I wasn't planning on increasing concurrency for my use case.
I just tested increasing the concurrency to 2, and this made the processing of the jobs nearly instantaneous (which is what I originally expected).
from faktory_worker_python.
To answer your original question:
Would you be open to a pull request to refactor this code to match the Ruby implementation where the Executor Threads are the ones doing the fetching?
I would, but I think this would not be trivial. Jobs can be processed in other processes, so the socket would need to be shared. I think it would be difficult to get this right.
The easy path would be to make the 250ms
timeout configurable, so you can tune to something more suitable for your use case.
A better alternative would be to change the logic where the time.sleep
is to be cancellable. If Worker._process
could "cancel" the sleep(0.25)
when a job is complete, then the worker would instantly fetch the next job. That would remove the delay when the worker is at the concurrency limit without spinning the CPU.
I think this could be implemented with threading.Event
fairly easily.
from faktory_worker_python.
Related Issues (20)
- worker thread got OSError: [WinError 6] The handle is invalid when stop worker from Web UI
- Release 1.0
- Add support for sending exceptions to Sentry HOT 5
- Worker crashes if there's an error when fetching new jobs HOT 1
- Getting INFO times out after starting a lot of workers HOT 2
- Example worker restarting itself on 3.9 HOT 6
- Cannot install version 0.5.0 via pip HOT 3
- ImportError: cannot import name 'BrokenThreadPool' for python version less than 3.7 HOT 1
- Flag version >0.5 to only be python 3.7+ compatible HOT 2
- Batch Support HOT 3
- How to use the `at` argument in queue? HOT 1
- Worker heartbeat crashes with "type 'NoneType' is not iterable" HOT 1
- Remove references to `priority` as a job argument
- FaktoryConnectionResetError in Connection.reply
- AttributeError: Can't pickle local object 'ETL_task.<locals>.task' HOT 6
- Explaination on the thread execution model HOT 4
- Make a new release (v1.1) HOT 1
- Access to Job status in Python client for faktory
- BrokenProcessPool error HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from faktory_worker_python.