yelp / python-gearman Goto Github PK
View Code? Open in Web Editor NEWGearman API - Client, worker, and admin client interfaces
Home Page: http://github.com/Yelp/python-gearman/
License: Other
Gearman API - Client, worker, and admin client interfaces
Home Page: http://github.com/Yelp/python-gearman/
License: Other
============== python-gearman ============== Description =========== Python Gearman API - Client, worker, and admin client interfaces For information on Gearman and a C-based Gearman server, see http://www.gearman.org/ Installation ============ * easy_install gearman * pip install gearman Links ===== * 2.x source <http://github.com/Yelp/python-gearman/> * 2.x documentation <http://packages.python.org/gearman/> * 1.x source <http://github.com/samuel/python-gearman/> * 1.x documentation <http://github.com/samuel/python-gearman/tree/master/docs/>
If we register a worker as worker 1
and call GearmanAdminClient.get_workers()
we get an error that says
gearman.errors.ProtocolError: Malformed worker response: ...
Renaming the worker to worker1
seems to make the problem go away, I'm assuming the space is causing it to be parsed incorrectly but I'm not completely sure.
I get a KeyError when using get_job_status with a job that has been submitted wait_until_complete=False, though when I try the request again it works:
>>> import gearman
>>> gc = gearman.client.GearmanClient(['localhost:4730'])
>>> j = gc.submit_job('test', "asdf", wait_until_complete=False)
>>> gc.get_job_status(j)
<GearmanJobRequest task='test', unique='064ac51350d73671ba3aa51086dbb862', priority=None, background=False, state='CREATED', timed_out=False>
>>> # process job
>>> gc.get_job_status(j)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/client.py", line 127, in get_job_status
request_list = self.get_job_statuses([current_request], poll_timeout=poll_timeout)
File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/client.py", line 141, in get_job_statuses
return self.wait_until_job_statuses_received(job_requests, poll_timeout=poll_timeout)
File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/client.py", line 158, in wait_until_job_statuses_received
self.poll_connections_until_stopped(self.connection_list, continue_while_status_not_updated, timeout=poll_timeout)
File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/connection_manager.py", line 194, in poll_connections_until_stopped
self.handle_connection_activity(read_connections, write_connections, dead_connections)
File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/connection_manager.py", line 160, in handle_connection_activity
self.handle_read(current_connection)
File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/connection_manager.py", line 218, in handle_read
current_handler.fetch_commands()
File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/command_handler.py", line 39, in fetch_commands
continue_working = self.recv_command(cmd_type, **cmd_args)
File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/command_handler.py", line 65, in recv_command
completed_work = cmd_callback(**cmd_args)
File "/usr/local/lib/python2.7/dist-packages/gearman-2.0.2-py2.7.egg/gearman/client_handler.py", line 151, in recv_status_res
current_request = self.handle_to_request_map[job_handle]
KeyError: 'H:trurl:1267'
>>> # do nothing, just call the same method again
>>> gc.get_job_status(j)
<GearmanJobRequest task='test', unique='064ac51350d73671ba3aa51086dbb862', priority=None, background=False, state='COMPLETE', timed_out=False>
the optional timeout option to register_task is missing... basically the worker can support
CAN_DO task
and
CAN_DO_TIMEOUT task timeout
If you don't have time to add this I can probably cook up a patch...
The sockets this library makes are not marked close on exec and consequently are inherited by any child processes such as those executed by the subprocess module.
Code is:
fd = sock.fileno()
old_flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC)
KeyError occurs when we receive a job handle that is not in self.handle_to_request_map
A possible reason is that the client timeout before it receives a JOB_CREATED when submitting a job.
In this case, the request is no registered in the handle_to_request_map.`
At some point later when the client receives a command that tries to access the map by an unknown job_handle,
KeyError is raised. Need more investigation to find the actual reason.
In [1]: import gearman
In [2]: gearman.version
Out[2]: '2.0.2'
In [3]: acli = gearman.GearmanAdminClient(['localhost'])
In [4]: acli.send_maxqueue('test', 10)
ProtocolError Traceback (most recent call last)
/home/.../ in ()
----> 1 acli.send_maxqueue('test', 10)
/usr/local/lib/python2.7/dist-packages/gearman/admin_client.pyc in send_maxqueue(self, task, max_size)
59 self.establish_admin_connection()
60 self.current_handler.send_text_command('%s %s %s' % (GEARMAN_SERVER_COMMAND_MAXQUEUE, task, max_size))
---> 61 return self.wait_until_server_responds(GEARMAN_SERVER_COMMAND_MAXQUEUE)
62
63 def send_shutdown(self, graceful=True):
/usr/local/lib/python2.7/dist-packages/gearman/admin_client.pyc in wait_until_server_responds(self, expected_type)
94 return (not current_handler.response_ready)
95
---> 96 self.poll_connections_until_stopped([self.current_connection], continue_while_no_response, timeout=self.poll_timeout)
97 if not self.current_handler.response_ready:
98 raise InvalidAdminClientState('Admin client timed out after %f second(s)' % self.poll_timeout)
/usr/local/lib/python2.7/dist-packages/gearman/connection_manager.pyc in poll_connections_until_stopped(self, submitted_connections, callback_fxn, timeout)
192 # Do a single robust select and handle all connection activity
193 read_connections, write_connections, dead_connections = self.poll_connections_once(submitted_connections, timeout=time_remaining)
--> 194 self.handle_connection_activity(read_connections, write_connections, dead_connections)
195
196 any_activity = compat.any([read_connections, write_connections, dead_connections])
/usr/local/lib/python2.7/dist-packages/gearman/connection_manager.pyc in handle_connection_activity(self, rd_connections, wr_connections, ex_connections)
158 for current_connection in rd_connections:
159 try:
--> 160 self.handle_read(current_connection)
161 except ConnectionError:
162 dead_connections.add(current_connection)
/usr/local/lib/python2.7/dist-packages/gearman/connection_manager.pyc in handle_read(self, current_connection)
216
217 # Notify the handler that we have commands to fetch
--> 218 current_handler.fetch_commands()
219
220 def handle_write(self, current_connection):
/usr/local/lib/python2.7/dist-packages/gearman/command_handler.pyc in fetch_commands(self)
37
38 cmd_type, cmd_args = cmd_tuple
---> 39 continue_working = self.recv_command(cmd_type, *_cmd_args)
40
41 def send_command(self, cmd_type, *_cmd_args):
/usr/local/lib/python2.7/dist-packages/gearman/command_handler.pyc in recv_command(self, cmd_type, **cmd_args)
63 # Expand the arguments as passed by the protocol
64 # This must match the parameter names as defined in the command handler
---> 65 completed_work = cmd_callback(**cmd_args)
66 return completed_work
67
/usr/local/lib/python2.7/dist-packages/gearman/admin_client_handler.pyc in recv_text_command(self, raw_text)
87
88 # This must match the parameter names as defined in the command handler
---> 89 completed_work = cmd_callback(raw_text)
90 return completed_work
91
/usr/local/lib/python2.7/dist-packages/gearman/admin_client_handler.pyc in recv_server_maxqueue(self, raw_text)
150 """Maxqueue response is a simple passthrough"""
151 if raw_text != 'OK':
--> 152 raise ProtocolError("Expected 'OK', received: %s" % raw_text)
153
154 self._recv_responses.append(raw_text)
ProtocolError: Expected 'OK', received: OK
It seems to be impossible to get job id from worker code with python-gearman
I need to get job id from the worker, before with php lib I was setting option GEARMAN_WORKER_GRAB_UNIQ with call to function addOptions(GEARMAN_WORKER_GRAB_UNIQ);
After that, with my worker, I was able to get the job id with calling the "unique" attribut like that : $job->unique()
Do you know a way to do the same with the python lib ?
Thanks.
Currently the docs/conf.py file is not included in the released tarballs, which make it difficult to locally generate documentation for the package. Could you please add it to the manifest file so that it gets included in the next release?
looking at http://pythonhosted.org/gearman/client.html#retrieving-job-status there does not appear to be any way to get the result of a background job. Waiting on status complete does not work because "complete" for a background job is when it receives it.
https://github.com/Yelp/python-gearman/blob/master/gearman/util.py#L11-L35
has_time_remaining
attempts to compare an absolute time (stop_time
) with a time interval (time_comparison
), the latter of which will (logically) always be less than the former.
The following code:
import time
import random
import gearman
import traceback
import json
class RequestTimeout:
def __init__(self):
self.delayed = {}
self.disabled = 0
def disableMethod(self, method, timeout):
self.delayed[method] = time.time()+timeout
def disableAll(self, timeout):
self.disabled = time.time()+timeout
def isDisabled(self, method):
t = time.time()
if (method not in self.delayed):
self.delayed[method] = 0
if (self.disabled >= t) or (self.delayed[method] >= t):
return True
return False
def getTimeout(self, method):
if (method not in self.delayed):
self.delayed[method] = 0
if (self.disabled > self.delayed[method]):
return self.disabled
return self.delayed[method]
class Controler:
__keys = [('123-456-789', RequestTimeout()), ('987-654-321', RequestTimeout())]
@staticmethod
def getApiKey(gearman_worker, gearman_job):
jobData = json.loads(gearman_job.data)
method = jobData['method']
random.shuffle(Controler.__keys)
for (key, timeout) in Controler.__keys:
if (not timeout.isDisabled(method)):
return key
return -1
@staticmethod
def disableMethod(gearman_worker, gearman_job):
jobData = json.loads(gearman_job.data)
method = jobData['method']
key = jobData['key']
timeout = jobData['timeout']
try:
for (k, t) in Controler.__keys:
if (k == key):
t.disableMethod(method, timeout)
return 'T'
return 'F'
except:
traceback.print_exc()
return 'F'
@staticmethod
def disableKey(gearman_worker, gearman_job):
jobData = json.loads(gearman_job.data)
key = jobData['key']
timeout = jobData['timeout']
try:
for (k, t) in Controler.__keys:
if (k == key):
t.disableAll(timeout)
return 'T'
return 'F'
except:
traceback.print_exc()
return 'F'
@staticmethod
def getMethodTimeout(gearman_worker, gearman_job):
jobData = json.loads(gearman_job.data)
method = jobData['method']
t = time.time()
result = (sys.maxint, '')
for (key, timeout) in Controler.__keys:
t0 = min(result[0], timeout.getTimeout(method))
if (t0 != result[0]):
result = [t0, key]
return result
if __name__ == "__main__":
gm_worker = gearman.GearmanWorker(['10.132.157.195:4730'])
gm_worker.set_client_id('python-worker')
gm_worker.register_task('Manager-GetApiKey', Controler.getApiKey)
gm_worker.register_task('Manager-DisableMethod', Controler.disableMethod)
gm_worker.register_task('Manager-DisableKey', Controler.disableKey)
gm_worker.register_task('Manager-GetMethodTimeout', Controler.getMethodTimeout)
gm_worker.work()
provides the following crash:
root@Workers:~/worker# cat nohup.out
Traceback (most recent call last):
File "manager.py", line 102, in <module>
gm_worker.work()
File "build/bdist.linux-x86_64/egg/gearman/worker.py", line 83, in work
continue_working = self.poll_connections_until_stopped(worker_connections, continue_while_connections_alive, timeout=poll_timeout)
File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 194, in poll_connections_until_stopped
self.handle_connection_activity(read_connections, write_connections, dead_connections)
File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 160, in handle_connection_activity
self.handle_read(current_connection)
File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 218, in handle_read
current_handler.fetch_commands()
File "build/bdist.linux-x86_64/egg/gearman/command_handler.py", line 39, in fetch_commands
continue_working = self.recv_command(cmd_type, **cmd_args)
File "build/bdist.linux-x86_64/egg/gearman/command_handler.py", line 65, in recv_command
completed_work = cmd_callback(**cmd_args)
File "build/bdist.linux-x86_64/egg/gearman/worker_handler.py", line 137, in recv_job_assign_uniq
self.connection_manager.on_job_execute(gearman_job)
File "build/bdist.linux-x86_64/egg/gearman/worker.py", line 198, in on_job_execute
return self.on_job_complete(current_job, job_result)
File "build/bdist.linux-x86_64/egg/gearman/worker.py", line 205, in on_job_complete
self.send_job_complete(current_job, job_result)
File "build/bdist.linux-x86_64/egg/gearman/worker.py", line 147, in send_job_complete
current_handler.send_job_complete(current_job, data=data)
File "build/bdist.linux-x86_64/egg/gearman/worker_handler.py", line 58, in send_job_complete
self.send_command(GEARMAN_COMMAND_WORK_COMPLETE, job_handle=current_job.handle, data=self.encode_data(data))
File "build/bdist.linux-x86_64/egg/gearman/command_handler.py", line 28, in encode_data
return self.connection_manager.data_encoder.encode(data)
File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 31, in encode
cls._enforce_byte_string(encodable_object)
File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 27, in _enforce_byte_string
raise TypeError("Expecting byte string, got %r" % type(given_object))
TypeError: Expecting byte string, got <type 'int'>
I'm using the gearman 2.0.2 (The one on github)
PS:: Is this still being developed?
Documenting this request after a discussion with Kevin on IRC.
Python-gearman chooses to use epoll.poll() on systems that support it. Otherwise, it falls back to the use of select(). Using python-gearman in code that uses eventlet causes a blocking condition when poll() is used. Eventlet will green select(), so it would be nice to force the use of select() over poll().
I'm not sure where else to post this, but I have no idea how to run the included tests. Does anybody know? Also, what happened to Python 3 support? I want to run the tests so I can try my hand at converting it to Python 3.
Hello,
You've got lots of logging stuff which is throwing errors, I think this page might be relevant:
http://docs.python.org/library/logging#configuring-logging-for-a-library
Infinitly submitting jobs results in a big memory leak. The client memory consumption keeps growing and growing when the the queue is full
while True: gm_client.submit_job("task1", "some data ", background=True, wait_until_complete=False, poll_timeout=0.020)
Trying to use the pickle encoder at https://github.com/Yelp/python-gearman/blob/master/docs/client.rst gives this error...
class PickleDataEncoder(gearman.DataEncoder):
AttributeError: 'module' object has no attribute 'DataEncoder'
python-gearman shouldn't handle it, and it currently does it wrong, since it just inserts the job data.
It should simply pass the "-" to gearmand and let gearmand handle the hashing of the data.
I found that when using multiple gearmand servers and having the workers connect to both of them, somehow (intermittently) some of the workers will just accept jobs from one of the servers only (i.e., it seems to lose connection to the other server or blocked in waiting for jobs in one server only and thus doesn't fetch jobs from there anymore). This causes the jobs from that server to be executed only by the workers which are still connected to that server. I experienced this in a 16-core Ubuntu machine.
This is somehow related to #17 although a bit different.
Configuration:
Try the minimal reproducing code below to test.
# test.bash
for i in {0..5}; do
python gearman_client.py
sleep 2
done
# gearman_client.py
import multiprocessing
import gearman
import traceback
def start_gearman_client(process_id):
gm_client = gearman.GearmanClient(['127.0.0.1:4730','127.0.0.1:4731'])
try:
requests = []
for gm_job_id in range(500):
request = gm_client.submit_job(
task='do_task',
data='%d_%03d' % (process_id,gm_job_id),
unique='%d_%03d' % (process_id,gm_job_id),
background=False,wait_until_complete=False)
requests.append(request)
gm_client.wait_until_jobs_completed(requests)
except:
print traceback.format_exc()
return 0
def main():
child_processes = []
for process_id in range(2):
p = multiprocessing.Process(target=start_gearman_client, args=(process_id,))
child_processes.append((process_id,p))
p.start()
for (pid,child) in child_processes:
print 'Confirming that child number %d had died' % pid
child.join()
if __name__ == '__main__':
main()
# gearman_worker.py
import gearman
import multiprocessing
import time
import traceback
from functools import partial
def do_work(gearman_worker,gearman_job,worker_id):
try:
print 'Worker %02d processing %s from port %d: %s' % (worker_id,gearman_job.data,gearman_job.connection.gearman_port,gearman_job.unique)
time.sleep(0.001)
except:
print traceback.format_exc()
return 'Done by worker %d through port %d' % (worker_id,gearman_job.connection.gearman_port)
def start_gearman_worker(worker_id):
gm_worker = gearman.GearmanWorker(['127.0.0.1:4730','127.0.0.1:4731'])
gm_worker.register_task('do_task', partial(do_work,worker_id=worker_id))
print 'Worker %d start working' % worker_id
gm_worker.work()
if __name__ == '__main__':
workers = []
for pid in range(8):
worker = multiprocessing.Process(target=start_gearman_worker,args=(pid,))
workers.append(worker)
worker.start()
for worker in workers:
worker.join()
My last lines of output in worker's console:
Worker 02 processing 1_467 from port 4731: 1_467
Worker 04 processing 0_484 from port 4731: 0_484
Worker 05 processing 1_468 from port 4731: 1_468
Worker 02 processing 1_469 from port 4731: 1_469
Worker 04 processing 1_470 from port 4731: 1_470
Worker 05 processing 0_486 from port 4731: 0_486
Worker 02 processing 0_487 from port 4731: 0_487
Worker 04 processing 1_474 from port 4731: 1_474
Worker 05 processing 1_473 from port 4731: 1_473
Worker 02 processing 0_490 from port 4731: 0_490
Worker 05 processing 1_476 from port 4731: 1_476
Worker 04 processing 0_491 from port 4731: 0_491
Worker 02 processing 1_478 from port 4731: 1_478
Worker 05 processing 0_492 from port 4731: 0_492
Worker 04 processing 1_479 from port 4731: 1_479
Worker 02 processing 1_484 from port 4731: 1_484
Worker 05 processing 1_485 from port 4731: 1_485
Worker 04 processing 0_497 from port 4731: 0_497
Worker 02 processing 0_498 from port 4731: 0_498
Worker 05 processing 1_489 from port 4731: 1_489
Worker 02 processing 1_492 from port 4731: 1_492
Worker 04 processing 1_493 from port 4731: 1_493
Worker 05 processing 1_495 from port 4731: 1_495
Worker 02 processing 1_498 from port 4731: 1_498
Worker 04 processing 1_499 from port 4731: 1_499
As you can see, only workers 2, 4, and 5 are processing jobs from 4731, the others just don't fetch jobs from there anymore.
when client submitted a background job, it seems woker will not send back any state except 'CREATE', and therefore the request in handle_to_request_map will not pop any time:
‘’‘python
def recv_job_created(self, job_handle):
if not self.requests_awaiting_handles:
raise InvalidClientState('Received a job_handle with no pending requests')
# If our client got a JOB_CREATED, our request now has a server handle
current_request = self.requests_awaiting_handles.popleft()
self._assert_request_state(current_request, JOB_PENDING)
# Update the state of this request
current_request.job.handle = job_handle
current_request.state = JOB_CREATED
# The question is: when to unregister the request if background was True?
self._register_request(current_request)
return True
’‘’
Python 3 was released in 2008 and has since become a very stable platform. Gearman needs to have a Python 3 supported library.
probably doing something wrong here.. but...
>>> j=c.submit_job("config", ip,wait_until_complete=False)
>>> c.get_job_status(j)
<GearmanJobRequest task='config', unique='c468734f3bfe4786e486be6651877ed7', priority=None, background=False, state='CREATED', timed_out=False>
>>> c.get_job_status(j)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "build/bdist.linux-x86_64/egg/gearman/client.py", line 127, in get_job_status
File "build/bdist.linux-x86_64/egg/gearman/client.py", line 141, in get_job_statuses
File "build/bdist.linux-x86_64/egg/gearman/client.py", line 158, in wait_until_job_statuses_received
File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 194, in poll_connections_until_stopped
File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 160, in handle_connection_activity
File "build/bdist.linux-x86_64/egg/gearman/connection_manager.py", line 218, in handle_read
File "build/bdist.linux-x86_64/egg/gearman/command_handler.py", line 39, in fetch_commands
File "build/bdist.linux-x86_64/egg/gearman/command_handler.py", line 65, in recv_command
File "build/bdist.linux-x86_64/egg/gearman/client_handler.py", line 142, in recv_status_res
File "build/bdist.linux-x86_64/egg/gearman/client_handler.py", line 57, in _assert_request_state
gearman.errors.InvalidClientState: Expected handle (H:dell:333) to be in state 'CREATED', got 'COMPLETE'
when launching the workers, they poll the gearmand server for 60 seconds but then they sleep and don't wake up when a job is ready on the server.
If a job is submitted, instead of been processed immediately, the workers take the project after around also 1 min.
Any idea of how to make the workers process all the jobs immediately or to wake them up when a job arrives ?
Jobs I am creating are in background mode
Trying to test "after_poll" from a worker, I noticed that it is actually triggered 2 times at every poll cycle.
Looking at the worker.py code, “after_poll” is called from within “continue_while_connections_alive(any_activity)”, which is passed as the callback function when invoking “calling poll_connections_until_stopped”.
Within “calling poll_connections_until_stopped” , the callback function is called before starting everything else, and within the loop.
May be I misunderstood the purpose of “after_poll”, but if the purpose is to have a method called every timeout, then the right place for calling “after_poll” would be, in my opinion, within the “if time_remaining == 0.0:” section.
But simply changing the place where the “callback_fxn” (“after_poll” in the case of a call from a worker) will not do. Indeed, the callback function is used not only to trigger the after_poll method in case poll_connections_until_stopped is called from a worker, but also to call a “continue_while_jobs_pending” method in case it is called from a client.
So, we can't simply change the location of calls to “callback_fxn” within the “calling poll_connections_until_stopped” method.
I can see 2 solutions:
1/ Split the poll_connections_until_stopped method: one to be called by clients, one to be called by workers.
In the worker version of it, do not re-enter the loop when timing out. This would mean that “work” should be included in an infinite loop (as it is done in the php library). The “after_poll” would simply not exist: additional code in the infinite loop would deal with it.
2/ Add a “after_timeout(any_activity)” method in the worker and client classes, which would be called from within poll_connections_until_stopped method.
The second solution is simpler to implement, in my opinion. But we would loose the possibility to stop the polling from within after_poll (which is not an issue, in my opinion).
Does it make sense? Am I missing something? Should I propose a patch ? This is at the very heart of the code, so I'm afraid to miss a use case.
If an error occurs in my Gearman worker, I can't use send_job_failure because there's no reason attached to it.
I can't use send_job_exception because the gearmand server stops sending jobs to the worker after 2 exceptions occur and I can't find any documentation anywhere on how to change this.
Moreover, the GearmanWorker class expects my function to return something. What's a good philosophy for what should be returned here and not via send_job_data? If I send some information via send_job_data what should I return from the function?
Hi,
The tests directory currently isn't included in the manifest file and releases. As the tests can be quite useful for distributions packaging your software (testing for regressions), could you include it in the releases? Also, changing setup.py so that the tests are automatically run when you call 'setup.py test' would be a nice addition.
Thanks!
Currently in pypi repo just the version 2.0.2 is released; this is of 2011 and since them have been many commits in master and any new version.
I would like to propose a release 2.0.3 which includes just the patch for avoiding memory leaks (using weakrefs instead of normal dicts). you can see here:
https://github.com/ealogar/python-gearman/tree/v2.0.3
For launching this PR and release a version I would propose to include the tag v2.0.2 in a new branch release v2.0.2
Another idea is to release a 3.0.0 pointing to current master branch.
In both cases I think a branch with version name in repo would be needed to propose patches to version.
Is it possible to do this?
The following (simplified) setup goes awry:
The outcome is based on a race condition: Whichever server (A or B) replies first gets the task. If it is A, the task succeeds; if it is B, the task fails.
In general, unless all servers have the exact same set of worker tasks available, clients can't rely on requests succeeding.
This makes it difficult to rely on multiple gearmand servers for load balancing and redundancy, even if the general intent is to have all workers connect to all servers (config errors, connection flukes, etc. can get in the way -- exactly what the redundancy is there to help with).
After refactoring of method poll_connections_once
in commit 595f189 python-gearman causes 100% CPU load until timeout is reached.
If some exception is raised inside of the worker, the python-gearman just ignores it. The job is completed as failed, but the worker does not report anything at all.
Some message via logging at least would be good.
If you have a gearman client like the one below and you want to submit many jobs, then you run into a memory leak.
client = gearman.GearmanClient(server)
for job in many_many_jobs:
req = client.submit_job(task, data) ...
The problem is in the python module client_handler.py, where every job is stored in a dictionary but never deleted.
The module doesn't work with ipv6. I didn't try workers, but with clients, there are (at least) two issues.
Firstly, server endpoint parsing code uses hostport_tuple.split(':')
, which is bad for ipv6 (since ipv6 addresses contain a lot of ':').
Secondly, what's worse, it uses an explicit AF_INET for connect()
, which restricts it to IPv4 addresses. Is there a reason for that?
The following patch fixes the second issue and allows the client code to work at least when the server address is specified as a name or as a tuple of (ipv6_addr, port)
:
--- connection.py.old 2015-06-13 16:46:27.000000000 +0300
+++ connection.py 2015-06-15 13:41:18.976557457 +0300
@@ -94,8 +94,8 @@
def _create_client_socket(self):
"""Creates a client side socket and subsequently binds/configures our socket options"""
try:
- client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- client_socket.connect((self.gearman_host, self.gearman_port))
+ client_socket = socket.create_connection(
+ (self.gearman_host, self.gearman_port))
except socket.error, socket_exception:
self.throw_exception(exception=socket_exception)
In worker_handler, we should discard jobs that raise exceptions when attempting to decode
https://github.com/Yelp/python-gearman/blob/master/gearman/worker_handler.py#L134
Hi! It seems this module and the libgearman module:
http://pypi.python.org/pypi/python3-libgearman/0.13.3
Conflict, but they don't need to.
the gearman/init.py in that file is just:
from pkgutil import extend_path
path = extend_path(path, name)
This is just to make python find the C extension.
If you were willing to add that file to init.py from this module, we could share the file, allowing users to have both installed.
Hi,
I'm packaging your software for including it in Debian, but have some questions about the copyright:
Thanks!
Hi!
The version on pypi is a bit old. Any chance you all could do a release sometime soon?
Thanks,
-Brian
I'm trying to submit a task, and then later come and check on it's status. At this point I don't have any reference to the job or the request. I only have the job handle.
I've looked through the API documentation and can't find any method that will do this. But, I've noticed that the gearman protocol has a command to accomlish this.
Any help is appreciated.
Remove > from the end.
Is any idea to get job's status by job's handle?
it looks like
job = sumbit_job(...)
and another process can get job's status by job.job.handle:
status = get_job_status(job.job.handle)
??
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.