cdrx / faktory_worker_python Goto Github PK
View Code? Open in Web Editor NEWPython worker for the Faktory project
License: BSD 3-Clause "New" or "Revised" License
Python worker for the Faktory project
License: BSD 3-Clause "New" or "Revised" License
This causes the worker to continue grabbing messages that then immediately fail because the process pool is broken.
Is it possible to have API's that can give access to Enqueued, Busy and Processed jobs via the python client for Faktory.
If I go ahead with the enterprise version of Faktory, how will python client support the access to Job Status.
As per this link [https://github.com/contribsys/faktory/wiki/Ent-Tracking] these API's are available only in Ruby and GO.
Changes:
I run the folowing example
from faktory import Worker
import logging
import time
def your_function(x, y):
time.sleep(15)
return x + y
w = Worker(queues=['default'], concurrency=1)
w.register('test', your_function)
print(w.get_worker_id())
logging.basicConfig(level=logging.DEBUG)
w.run() # runs until control-c or worker shutdown from Faktory web UI
worker id prints as 32860cb098ae4e13ab8a7f44af498bf1
but heartbeat shows
DEBUG:faktory.worker:Sending heartbeat for worker 9e9775cb1abc47b7b6c6bc1ec6689610
looking in the web interface it seem that the debug message contains the correct id.
when args
is a tuple:
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.5/concurrent/futures/process.py", line 175, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "/src/discovery/tasks/heartbeat.py", line 37, in heartbeat
'mac': mac
File "/src/discovery/marshall.py", line 74, in process
return self._process(data, device, update_last_seen)
File "/src/discovery/marshall.py", line 162, in _process
device.sync()
File "/src/discovery/models/device.py", line 128, in sync
faktory.push('server.sync', 'discovery.sync', args=(str(self.uuid), ), retry=3)
File "/src/server/async/__init__.py", line 12, in push
c.queue(task=task, queue=queue, args=args, retry=retry, priority=priority, reserve_for=reserve_for)
File "/usr/local/lib/python3.5/dist-packages/faktory/client.py", line 57, in queue
if not isinstance(args, (typing.Iterator, typing.Set, typing.List, typing.Tuple)):
File "/usr/lib/python3.5/typing.py", line 725, in __instancecheck__
raise TypeError("Tuples cannot be used with isinstance().")
TypeError: Tuples cannot be used with isinstance().
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/faktory/worker.py", line 225, in send_status_to_faktory
future.result(timeout=1)
File "/usr/lib/python3.5/concurrent/futures/_base.py", line 398, in result
return self.__get_result()
File "/usr/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result
raise self._exception
TypeError: Tuples cannot be used with isinstance().
We are seeing occasional stack traces in our faktory workers like:
'FAIL {"jid": "4e9f7352c8004407bc53287aabbffd3c", "errtype": "FaktoryConnectionResetError", "message": ""}'
These are occurring within faktory._proto.Connection's reply method. Currently line 325:
https://github.com/cdrx/faktory_worker_python/blob/master/faktory/_proto.py#L325
sent = self.socket.send(buffer)
I'm not very familiar with this code but it seems like some kind of retry might be needed here?
Hi, there's an at
argument in the queue function and I believe it is used for setting the time the producer is sent to the server, but how do I use this? What is its value type?
from Client.py:
def queue( self, task: str, args: typing.Iterable = None, queue: str = "default", priority: int = 5, jid: str = None, custom=None, reserve_for=None, at=None, retry=5, backtrace=0, )
Hey guys! I've been using this faktory adaptor for quite some time and have had great success with it!
My application consists of a faktory worker inside of a kubernetes pod, which runs a image processing algorithm. The system works really well, but when I checked the logs recently I encoutered this error:
╭──── Traceback (most recent call last) ─────╮
│ /app/venv/lib/python3.8/site-packages/fakt │
│ ory/worker.py:248 in │
│ send_status_to_faktory │
│ │
│ 245 │ │ │ if future.done(): │
│ 246 │ │ │ │ self._pending.remove │
│ 247 │ │ │ │ try: │
│ ❱ 248 │ │ │ │ │ future.result(ti │
│ 249 │ │ │ │ │ self._ack(future │
│ 250 │ │ │ │ except KeyboardInter │
│ 251 │ │ │ │ │ self._fail(futur │
│ │
│ /usr/local/lib/python3.8/concurrent/future │
│ s/_base.py:437 in result │
│ │
│ 434 │ │ │ │ if self._state in [C │
│ 435 │ │ │ │ │ raise CancelledE │
│ 436 │ │ │ │ elif self._state == │
│ ❱ 437 │ │ │ │ │ return self.__ge │
│ 438 │ │ │ │ │
│ 439 │ │ │ │ self._condition.wait │
│ 440 │
│ │
│ /usr/local/lib/python3.8/concurrent/future │
│ s/_base.py:389 in __get_result │
│ │
│ 386 │ def __get_result(self): │
│ 387 │ │ if self._exception: │
│ 388 │ │ │ try: │
│ ❱ 389 │ │ │ │ raise self._exceptio │
│ 390 │ │ │ finally: │
│ 391 │ │ │ │ # Break a reference │
│ 392 │ │ │ │ self = None │
╰────────────────────────────────────────────╯
BrokenProcessPool: A process in the process
pool was terminated abruptly while the future
was running or pending.
I saw that this error apparently was resolved in #42 , but my pods are runnin faktory 1.0.0, so the fix should be availiable. This does not kill the pod or make it restart, but I would like to get to the bottom of this. Any tips?
Thanks in advance!
This feature has not been supported by Faktory since version 0.9
WARNING:faktory.worker:Faktory has quieted this worker, will not run any more tasks
WARNING:faktory.worker:Faktory has asked this worker to shutdown, will cancel any pending tasks still running 25s time
INFO:faktory.connection:Disconnected
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "C:\Users\1907003\AppData\Local\Programs\Python\Python37\Lib\concurrent\futures\process.py", line 101, in _python_exit
thread_wakeup.wakeup()
File "C:\Users\1907003\AppData\Local\Programs\Python\Python37\Lib\concurrent\futures\process.py", line 89, in wakeup
self._writer.send_bytes(b"")
File "C:\Users\1907003\AppData\Local\Programs\Python\Python37\Lib\multiprocessing\connection.py", line 183, in send_bytes
self._check_closed()
File "C:\Users\1907003\AppData\Local\Programs\Python\Python37\Lib\multiprocessing\connection.py", line 136, in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
Exception in thread QueueManagerThread:
Traceback (most recent call last):
File "C:\Users\1907003\AppData\Local\Programs\Python\Python37\Lib\threading.py", line 917, in _bootstrap_inner
self.run()
File "C:\Users\1907003\AppData\Local\Programs\Python\Python37\Lib\threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\1907003\AppData\Local\Programs\Python\Python37\Lib\concurrent\futures\process.py", line 354, in _queue_management_worker
ready = wait(readers + worker_sentinels)
File "C:\Users\1907003\AppData\Local\Programs\Python\Python37\Lib\multiprocessing\connection.py", line 872, in wait
ov.cancel()
OSError: [WinError 6] The handle is invalid。
logging.basicConfig(level=logging.INFO)
time.sleep(1)
def adder(x, y):
logging.info("%d + %d = %d", x, y, x + y)
if __name__ == "__main__":
w = Worker(queues=['default'], concurrency=1)
w.register('adder', adder)
w.run()
time.sleep(1)
with faktory.connection() as client:
while True:
client.queue('adder', args=(random.randint(0,1000), random.randint(0,1000)))
time.sleep(5)
Several times a day, the worker gets hung waiting for INFO to return.
Possibly related, workers sometimes will not receive work from queue despite jobs being present.
(Just had a case of 20 workers, 5 jobs, no work being done; restart them, they pull the work)
Faktory 1.4.1 - running in Docker on AWS. contribsys/faktory:latest
The worker has the option to choose threads over processes for it's concurrent execution model. But how does it exactly work? For n
being the number of concurrent jobs, does it means that n
threads are started when the worker starts, and that those same three threads will be processing multiple jobs without restarting (model 1) ? Or does it means that each new job will start it's own thread that is closed at the end of it, up to a maximum of n
threads simultaneously (model 2)?
I am using thread-local global variables, that I expected to be created anew at each job execution (model 2), but from my debugging, I start to suspect that threads processes multiple tasks in a row without restarting (model 1).
Can you confirm/infirm my conclusion?
I have a single threaded worker defined as:
w = Worker(queues=['etherbi_decode'],
concurrency=1,
executor=ThreadPoolExecutor)
which works for a 2-3 hours and at some point falls into an infinite loop, not getting any new jobs and not sending heartbeats to the faktory server.
Logs from the worker before it falls into infinite loop:
INFO:faktory.connection:Connecting to faktory-faktory:7419 (with password None)
ERROR:faktory.worker:Task failed: 50c676346b5a4307826de2245caf5593
Traceback (most recent call last):
File "/usr/local/lib/python3.6/concurrent/futures/thread.py", line 56, in run
result = self.fn(*self.args, **self.kwargs)
File "etherbi/decoder/decoder_worker.py", line 225, in decode_bucket
_enqueue_block_decoded_task(task_number, 'calculate_burn_rate', 'etherbi_burn_rate_calculation')
File "etherbi/decoder/decoder_worker.py", line 187, in _enqueue_block_decoded_task
if faktory_client.queue(task, queue=queue, args=[task_number]):
File "/app/src/faktory/faktory/client.py", line 32, in queue
self.connect()
File "/app/src/faktory/faktory/client.py", line 21, in connect
self.is_connected = self.faktory.connect()
File "/app/src/faktory/faktory/_proto.py", line 64, in connect
self.socket.connect((self.host, self.port))
socket.timeout: timed out
INFO:__main__:Transaction to 0xaf30d2a7e90d7dc361c8c4585e9bb7d2f6f15bc7 is recognized in block 3917149 on position 98.
INFO:__main__:Transaction to 0xaf30d2a7e90d7dc361c8c4585e9bb7d2f6f15bc7 is recognized in block 3917154 on position 2.
<SOME_WORK_RELATED_LOGS_AS_THE_ABOVE>
INFO:faktory.connection:Connecting to faktory-faktory:7419 (with password None)
INFO:faktory.connection:Disconnected
running sudo strace -p <PID> -e trace=network -f -s 10000
against the process I get infinite stream of
[pid 16254] recvfrom(3, "", 4096, 0, NULL, NULL) = 0
[pid 16254] recvfrom(3, "", 4096, 0, NULL, NULL) = 0
[pid 16254] recvfrom(3, "", 4096, 0, NULL, NULL) = 0
[pid 16254] recvfrom(3, "", 4096, 0, NULL, NULL) = 0
[pid 16254] recvfrom(3, "", 4096, 0, NULL, NULL) = 0
[pid 16254] recvfrom(3, "", 4096, 0, NULL, NULL) = 0
[pid 16254] recvfrom(3, "", 4096, 0, NULL, NULL) = 0
[pid 16254] recvfrom(3, "", 4096, 0, NULL, NULL) = 0
We use python and elixir clients against the same faktory server and so far this behavior is observed only with the python workers.
Worker version: c5cb89b
Server version: 0.7.0
In 1b9eff3#diff-7cbedbf1bd8665e9c89414493ca105c611bc434c823bb9cce75867e35075c82dR12 the BrokenThreadPool
was introduced.
This Exception was introduced in Python 3.7 https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.thread.BrokenThreadPool
The package should indicate that requires Python 3.7+ with https://packaging.python.org/guides/distributing-packages-using-setuptools/#python-requires
I reached out to Mike Perham about batch support in Python and he asked me to open an issue here.
I've noticed in some simple testing that this doesn't process jobs very fast. It seems that this is due to the code structure of how the jobs are fetched. This seems to be because the Thread/Process that pulls jobs can only pull a job once there is an Executor Thread that is not busy. If there are no Executor Threads ready, we hit a 0.25s sleep before checking again (source).
This means that once a job completes, we could almost take up to 0.25s before we start working another job.
I know the Ruby faktory client, the "Executor" (Processor) threads are the ones that are doing the fetching, so they will immediately pull the next job once they're complete with their current job.
Would you be open to a pull request to refactor this code to match the Ruby implementation where the Executor Threads are the ones doing the fetching?
For example, if there's a timeout, the worker will crash. Relevant traceback:
timeout: The read operation timed out
File "entry.py", line 36, in <module>
w.run()
File "faktory/worker.py", line 149, in run
self.tick()
File "inventory/common/faktory/worker.py", line 84, in tick
job = self.faktory.fetch(self.get_queues())
File "faktory/_proto.py", line 160, in fetch
job = next(self.get_message())
File "faktory/_proto.py", line 115, in get_message
buffer = socket.recv(self.buffer_size)
File "ssl.py", line 1056, in recv
return self.read(buflen)
File "ssl.py", line 931, in read
return self._sslobj.read(len)
We're seeing occasional instances of these crashes in production running the pypi version 0.5.1
Looks like a bug on this line:
https://github.com/cdrx/faktory_worker_python/blob/master/faktory/worker.py#L341
def heartbeat(self) -> None:
...
ok = next(self.faktory.get_message())
if "state" in ok:
ok
is being assigned None
. The next line crashes with type 'NoneType' is not iterable
We are using faktory in our flask app. Our testcases started failing recently with following error:
ImportError: cannot import name 'BrokenThreadPool'
I came across this https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.thread.BrokenThreadPool mentioning BrokerThreadPool is new in python 3.7.
Can somebody help me with this?
Today an update of Faktory was released (0.5.0) which included the backtrace
parameter, but also the changes the following commit: 4aaaa37#diff-60f61ab7a8d1910d86d9fda2261620314edcae5894d5aaa236b821c7256badd7
The setup.py now refers to files that do not seem to be packaged. Below the output when I run pip install faktory==0.5.0
on MacOSX
Collecting faktory==0.5.0
Downloading faktory-0.5.0.tar.gz (10 kB)
ERROR: Command errored out with exit status 1:
command: /usr/local/anaconda3/envs/blendle-streaming/bin/python -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/private/var/folders/b7/tph8q_z12_lb13s4wd7rk1ch0000gn/T/pip-install-qkkjas4s/faktory_710524b00c484ed9950199e63eee2b3d/setup.py'"'"'; __file__='"'"'/private/var/folders/b7/tph8q_z12_lb13s4wd7rk1ch0000gn/T/pip-install-qkkjas4s/faktory_710524b00c484ed9950199e63eee2b3d/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /private/var/folders/b7/tph8q_z12_lb13s4wd7rk1ch0000gn/T/pip-pip-egg-info-3q6va8k9
cwd: /private/var/folders/b7/tph8q_z12_lb13s4wd7rk1ch0000gn/T/pip-install-qkkjas4s/faktory_710524b00c484ed9950199e63eee2b3d/
Complete output (5 lines):
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/private/var/folders/b7/tph8q_z12_lb13s4wd7rk1ch0000gn/T/pip-install-qkkjas4s/faktory_710524b00c484ed9950199e63eee2b3d/setup.py", line 4, in <module>
dev_requires = open("dev-requirements.txt").read().strip().split("\n")
FileNotFoundError: [Errno 2] No such file or directory: 'dev-requirements.txt'
----------------------------------------
WARNING: Discarding https://files.pythonhosted.org/packages/de/77/4a47ad1a84d33faaf179f0eb7f2e2f9d2d94118f9517d393aaf7d710d05d/faktory-0.5.0.tar.gz#sha256=101630a5788f5d11c17a1d5f0e1e04a8cc9164a886f54846fe29317d91d51327 (from https://pypi.org/simple/faktory/). Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.
ERROR: Could not find a version that satisfies the requirement faktory==0.5.0
ERROR: No matching distribution found for faktory==0.5.0
I am working with go and python and for a python call to work with go, when there is only one argument, you should pass this argument as array (or it will accuse an error from the go - string to [] interface {}).
I do not understand much of python so I do not know if it is easier to update the documentation or test when the argument is a single arg.
Great job, by the way :-)
with faktory.connection() as client: client.queue('some_job', args=([single_argument]), queue='some_job')
Hi guys .
we are trying Faktory our client is python API wrote using python Falcon and workers also in python. the problem is when doing load test on the API we got this error in faktory " i/o timeout Closing connection "
the code snippet below we use to push to faktory
client code
faktory_client = faktory.Client('tcp://ip:7419')
faktory_client.queue('x', args=('1','2','3'))
faktory_client.disconnect()
worker code
w = Worker('tcp://ip:7419', queues=['default'], concurrency=100)
w.register('recommendation', recommendation_updater)
we are using machine with 8 CPU. utilization reaches 1% of CPUs . and we are using tcp in the connection to faktory .
we are trying to push 10K concurrent request .
I am trying to use the Worker Example, and I am getting the following error.
Traceback (most recent call last):
File "faktory.py", line 1, in <module>
from faktory import Worker
File "/home/ubuntu/scale/python-service/faktory.py", line 1, in <module>
from faktory import Worker
ImportError: cannot import name 'Worker'```
Faktory is installed, since the Client is working perfectly fine.
I am running it in ubuntu 14.04 and I am using python 3.6.
Any ideas of what is going on? I will update the README.md if necessary after finding the issue.
Thanks!
I see the following error in the logs of a faktory worker. The worker works correctly with simple test tasks like a simple print('hello')
, or print(number * f"hello {name}")
, showing that the worker can run jobs, take string and integer arguments. I then queue a job that works well when running outside of a worker, and I receive the following error message:
Task failed: f194d1e1811843739f297a1efe81cc74
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'ETL_task.<locals>.task'
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/antoine/.cache/pypoetry/virtualenvs/dataworker-poc-BCFfAKR5-py3.10/lib/python3.10/site-packages/faktory/worker.py", line 248, in send_status_to_faktory
future.result(timeout=1)
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/usr/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'ETL_task.<locals>.task'
To understand my error message, in my case ETL_task
is the name of the module defining the decorator, and task
is the name of the wrapped function inside my decorator.
It's hard for me to even diagnose where the problem comes from, as the stack trace goes through two packages that are not my code. I also try to run the worker through python -m pdb
, and I still see the error message but without falling into the debugger.
How can I understand what's happening?
What I'm I seeing:
When running the sample code it seems like the worker crashes/restarts itself without the registered function being called.
What I'm expecting:
For the registered function to be ran.
What I'm doing:
I have some code that I've been running without problem on 4 different macs. This is the first time I try to run in on an m1. All of the other macs have python 3.7, this one has 3.9. Other than that everything remained the same. I tried to remove as much code from my working project to try to reproduce this but it then ocurred to me to use the sample code from the readme. And it behaves the same.
What I did is I added some print statements to that code and ran it:
from faktory import Worker
def your_function(x, y):
print("running your_function")
return x + y
w = Worker(queues=['default'], concurrency=1)
w.register('test', your_function)
print("after register")
w.run() # runs until control-c or worker shutdown from Faktory web UI
After running the example server code I get on the terminal the expected output:
after register
...but if I use the client code to send a job to be executed now the console shows this:
after register
after register
Like the whole code is being run again and the function is not executed.
Any ideas on how to debug this are appreciated.
Thank you
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.