noxdafox / pebble Goto Github PK
View Code? Open in Web Editor NEWMulti threading and processing eye-candy.
License: GNU Lesser General Public License v3.0
Multi threading and processing eye-candy.
License: GNU Lesser General Public License v3.0
Returning a lot of data from a subprocess will raise an exception:
from pebble import ProcessPool
from concurrent.futures import TimeoutError
def function(foo):
return 'A' * (10 ** foo)
def task_done(future):
try:
result = future.result() # blocks until results are ready
except TimeoutError as error:
print("Function took longer than %d seconds" % error.args[1])
except Exception as error:
print("Function raised %s" % error)
print(error.traceback) # traceback of the function
else:
print(len(result))
pool = ProcessPool(max_workers=5, max_tasks=10)
for i in xrange(11):
future = pool.schedule(function, args=[i], timeout=20)
future.add_done_callback(task_done)
pool.close()
pool.join()
[eth:~/tools/w3af] [w3af] develop* ± python testp.py
1
10
100
1000
10000
100000
1000000
10000000
100000000
1000000000
Process Process-4:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/eth/tools/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/process.py", line 387, in worker_process
send_result(channel, Result(task.id, result))
File "/home/eth/tools/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/common.py", line 163, in send_result
pipe.send(data)
File "/home/eth/tools/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/channel.py", line 97, in unix_send
return self.writer.send(obj)
SystemError: NULL result without error in PyObject_Call
Function raised Abnormal termination
No handlers could be found for logger "concurrent.futures"
[eth:~/tools/w3af] [w3af] develop* 3s ±
Note that the exception is NOT caught by except Exception as error:
in task_done
, which makes it impossible to handle.
This issue seems to be known / common:
And it also seems to be an anti-pattern to send tons of data to / from the subprocess. My goal here is to report this in order to get the conversation started, ultimately I would like this exception to be handled by pebble
in such a way that I can then handle it in my code.
This issue might be related with #28
from pebble import ProcessPool
from concurrent.futures import TimeoutError
import time
if __name__ == '__main__':
with ProcessPool() as pool:
future = pool.map(time.sleep, [1, 25, 62, 7], timeout=8)
iterator = future.result()
while True:
try:
next(iterator)
except StopIteration:
break
except TimeoutError as error:
print 'task took too long, {}, {}'.format(error.args[1], error)
# time.sleep(2) # <-- Fixes the crash
Yields:
task took too long, 8, ('Task timeout', 8)
task took too long, 8, ('Task timeout', 8)
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Python27\lib\multiprocessing\forking.py", line 381, in main
self = load(from_parent)
File "C:\Python27\lib\pickle.py", line 1384, in load
return Unpickler(file).load()
File "C:\Python27\lib\pickle.py", line 864, in load
dispatch[key](self)
File "C:\Python27\lib\pickle.py", line 886, in load_eof
raise EOFError
EOFError
Env:
Windows 7
Python: 2.7.15
Pebble: 4.3.9
You can see the commented time.sleep(2)
at the end. Interestingly, this resolves this issue. I suspect maybe something is happening too quickly before all the processes are closed.
I have a Ubuntu 18.04.1 LTS machine completely up to date that does not have the same issue. So it seems windows related.
It would be convenient if pebble
included a version string, preferrably as pebble.__version__
. Would this be possible? If yes, I could make a PR.
I keep getting this running from OSX
Exception in thread task_scheduler:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 810, in __bootstrap_inner
self.run()
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 763, in run
self.__target(_self.__args, *_self.__kwargs)
File "/xxx/migrate/lib/python2.7/site-packages/pebble/process/pool.py", line 82, in task_scheduler_loop
pool_manager.schedule(task)
File "/xxx/migrate/lib/python2.7/site-packages/pebble/process/pool.py", line 141, in schedule
self.worker_manager.dispatch(task)
File "/xxx/migrate/lib/python2.7/site-packages/pebble/process/pool.py", line 252, in dispatch
self.pool_channel.send(NewTask(task.number, task._metadata))
File "/xxx/migrate/lib/python2.7/site-packages/pebble/process/channel.py", line 55, in send
return self.writer.send(obj)
TypeError: 'NoneType' object is not callable
trying to run:
pool = process.Pool()
for q in query:
job = pool.schedule( background, args=[source, destination, q ] )
Hi,
Thanks for a nice package!
However, ran in to the following while trying to install pebble on python 3.6 on a windows machine today (I have installed previous releases without any trouble):
Directly through pip + pypi:
Collecting pebble Could not find a version that satisfies the requirement pebble (from versions: ) No matching distribution found for pebble
And when downloading the wheel manually and trying pip install *.whl
:
Pebble-4.3.4-py2.py3-none-any.whl is not a supported wheel on this platform.
Is there a way to cancel using ProcessPool.map from inside the function?
def func(iterable):
result = iterable * iterable
if result > 10:
# cancel pool
return result
def main(args):
pool = ProcessPool(max_workers=10)
future = pool.map(func, [1,2,3,4])
res = list(future.result())
# res = [1,4,9]
Thanks
Firstly, thank you for such a great library! I'm having great success integrating this, but have come up against one problem: if you cancel a future that is not yet running via future.cancel()
, it still gets run. 2 things stand out as odd, though:
task_done
callback is called when the future is cancelled, as expected. This means it is not called again when the task actually finishes running.Note that I am running this on Python 2.7, using the backported concurrent.futures
library (https://pypi.org/project/futures/).
Here is a minimal reproduction case which I have been using to test:
import os
import time
from pebble import ProcessPool
from concurrent.futures import TimeoutError, CancelledError
def function():
print '[{}] Sleeping'.format(os.getpid())
time.sleep(5)
print '[{}] Done'.format(os.getpid())
def task_done(future):
try:
result = future.result() # blocks until results are ready
except TimeoutError as error:
print("Function took longer than %d seconds" % error.args[1])
except CancelledError:
print('Cancelled')
except Exception as error:
print("Function raised %s" % error)
print(error.traceback) # traceback of the function
else:
print('Function returned: {}'.format(result))
pool = ProcessPool(max_workers=1)
futures = []
for i in range(0, 10):
future = pool.schedule(function)
future.add_done_callback(task_done)
futures.append(future)
time.sleep(2)
for future in futures:
if not future.running() and not future.done():
future.cancel()
pool.close()
pool.join()
The output is as follows:
(zapier)$ python demo.py
[1629] Sleeping
Cancelled
Cancelled
Cancelled
Cancelled
Cancelled
Cancelled
Cancelled
Cancelled
Cancelled
[1629] Done
Function returned: None
[1629] Sleeping
[1650] Sleeping
[1651] Sleeping
[1652] Sleeping
[1655] Sleeping
[1656] Sleeping
[1657] Sleeping
[1658] Sleeping
[1659] Sleeping
As you can see from the code, we're submitting 10 jobs to the pool, which has only a single worker. Each job waits for 5 seconds before returning. After 2 seconds, we cancel all the jobs which are not running or finished (of which there are 9, as the first job is still sleeping). At this point, all 9 task_done
callbacks are fired as expected.
Then, once the first job finishes, we'd expect the program to exit as pool.join()
should return. Instead, the 9 jobs that were previously cancelled each start to run, one after the other. The timestamps aren't shown in my output above, but this happens very fast—so the jobs are being killed almost immediately after starting. You can see by the incrementing pid
that the worker process is indeed being killed in each case.
Is there anything we can do here to work around this, or potentially fix the issue? I'm still quite new to the concurrent.futures
library, so I may well be doing something wrong!
Thank you again for the great work here!
Edit: I've just confirmed that this happens on Python 3 as well, using the standard library concurrent.futures
module.
Installing Pebble via pip for Python 2.7 seems to be misconfigured.
To wit:
(venv) c:\Users\scole\Documents\ingress>pip install Pebble
Collecting Pebble
Downloading Pebble-4.1.0-py2.py3-none-any.whl (49kB)
Installing collected packages: Pebble
Successfully installed Pebble-4.1.0
(venv) c:\Users\scole\Documents\ingress>python
Python 2.7.12 (v2.7.12:d33e0cf91556, Jun 27 2016, 15:19:22) [MSC v.1500 32 bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import pebble
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "c:\Users\scole\Documents\ingress\venv\lib\site-packages\pebble\__init__.py", line 10, in <module>
from pebble.pool.thread import ThreadPool
File "c:\Users\scole\Documents\ingress\venv\lib\site-packages\pebble\pool\thread.py", line 21, in <module>
from pebble.pool.base_pool import BasePool, run_initializer
File "c:\Users\scole\Documents\ingress\venv\lib\site-packages\pebble\pool\base_pool.py", line 21, in <module>
from concurrent.futures import Future, TimeoutError
ImportError: No module named concurrent.futures
>>>
Now, I certainly see the stuff in the top level of this repo that tries to install future
; it just hasn't happened for some reason. (I get this same error on both windows installs and linux installs.)
I'll take a look myself, but I haven't ever used setuptools before personally, so there's a bit of a learning curve there.
Hello!
I ran into an issue caused by a psycopg connection object that cannot be pickled when sending the task payload to the worker. I was eventually able to fix the root cause but the debugging process turned out to be quite tedious because I couldn't see where the actual exception was coming from.
Here's what I was able to find:
def dispatch(self, task):
try:
self.pool_channel.send(WorkerTask(task.id, task.payload))
except (OSError, EnvironmentError, TypeError) as error:
raise BrokenProcessPool(error)
try:
while context.alive:
task = task_queue.get()
if task is not None:
if task.future.cancelled():
task.set_running_or_notify_cancel()
task_queue.task_done()
else:
pool_manager.schedule(task)
else:
task_queue.task_done()
except BrokenProcessPool:
context.state = ERROR
the exception is caught and the state is set to ERROR
, but the original message and trace are lost.
4. On the following _check_pool_state
call, the state is ERROR
and a new RuntimeError
is raised, masking the root cause.
Ideally, the cause of the original exception would either be propagated and then included in the RuntimeError
or just sent to stderr, logged, etc.
If this is in fact the expected behaviour and it should've been logged, I may have missed a step in the setup part (though I do not recall seeing anything in the docs). Please let me know if that's case, and thanks for the great work you've done with pebble!
As a developer I would like to run a generator in another process, and get the results one by one to be able to process them in the main thread.
The function I'm running generates results which uses a lot of RAM (a list with many objects) and each object of the list can be processed individually. If I wouldn't be using a different process to run the function that generates these results, I would certainly use a generator function.
At the moment the functions being run in the sub-processes can only return one result: the list with all the objects. In an ideal scenario, if this gets implemented, future.result()
would return a generator which I could iterate over. Each time a result is produced by the sub-process function, it is pickled, sent to the pipe, unpickled and yield in the generator.
Python's multiprocessing pool has various limitations, one I tried to solve with my wrapper code is the timeout of worker processes. I implemented that in a rather ugly way, when the timeout is reached I os.kill
the process. The multiprocessing pool implementation does spawn a new worker process and 99.99% of the time everything works well.
0.01% of the time there are some strange issues and the whole pool stops working, which I believe is because of this issue which is documented in the python docs:
Warning If a process is killed using Process.terminate() or os.kill() while it is trying to use a Queue, then the data in the queue is likely to become corrupted. This may cause any other process to get an exception when it tries to use the queue later on.
While looking for different multiprocessing pool implementations I found yours, which does implement timeouts and as far as I could read from process.py#L360-L366 there is code to avoid killing a process when the queue is locked. Am I reading that part of the code correctly?
Which other issues from python's multiprocessing pool did you fix in your implementation?
Does your pool implementation have any known issues?
This issue is something I found while running unittests in my build environment. It seems like an exception during interpreter shutdown:
Exception in thread Thread-6:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 763, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/ubuntu/virtualenvs/venv-2.7.3/local/lib/python2.7/site-packages/pebble/pool/process.py", line 170, in message_manager_loop
pool_manager.process_next_message(SLEEP_UNIT)
File "/home/ubuntu/virtualenvs/venv-2.7.3/local/lib/python2.7/site-packages/pebble/pool/process.py", line 199, in process_next_message
message = self.worker_manager.receive(timeout)
File "/home/ubuntu/virtualenvs/venv-2.7.3/local/lib/python2.7/site-packages/pebble/pool/process.py", line 322, in receive
return NoMessage()
TypeError: 'NoneType' object is not callable
Not sure if this is related to a) me not calling stop()
, terminate()
, etc. on the pool or a real bug in the pebble
code. Thoughts?
Hi,
Thank you for the great library :)
Wanted to ask a question regarding your SO answer here:
https://stackoverflow.com/a/45565421
Basically, what exactly happens to a process when the cancel()
is called in
your library?
I wasn't able to fully understand that by looking at your code.
Thank you so much !
I can't cancel a running pebble.ProcessFuture
after a timeout is reached:
#!/usr/bin/env python3
from pebble import ProcessPool
from concurrent.futures import wait, FIRST_COMPLETED
import os
import subprocess
import time
def run():
subprocess.check_output('md5sum /dev/random', shell=True)
with ProcessPool(max_workers=4) as pool:
print('new ProcessPool')
futures = []
for i in range(4):
futures.append(pool.schedule(run, timeout=1))
print('md5sum started')
time.sleep(3)
for f in futures:
print(f)
f.cancel()
print('Cancel: %s' % str(f))
time.sleep(3)
print('After cancelation:')
for f in futures:
print(f)
$ ./pe.py
new ProcessPool
md5sum started
<ProcessFuture at 0x7fc92b5a4d90 state=finished raised TimeoutError>
Cancel: <ProcessFuture at 0x7fc92b5a4d90 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c130 state=finished raised TimeoutError>
Cancel: <ProcessFuture at 0x7fc929d2c130 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c1f0 state=finished raised TimeoutError>
Cancel: <ProcessFuture at 0x7fc929d2c1f0 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c2b0 state=finished raised TimeoutError>
Cancel: <ProcessFuture at 0x7fc929d2c2b0 state=finished raised TimeoutError>
After cancelation:
<ProcessFuture at 0x7fc92b5a4d90 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c130 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c1f0 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c2b0 state=finished raised TimeoutError>
and
$ ps ax | grep md5
19160 pts/1 R 0:18 md5sum /dev/random
19161 pts/1 R 0:18 md5sum /dev/random
19162 pts/1 R 0:18 md5sum /dev/random
19163 pts/1 R 0:19 md5sum /dev/random
pip install imthread
import imthread
import requests
#the function for processing data
def my_func(data):
imthread.console_log(output=True)
data = requests.get("http://httpbin.org/get")
return data
#sending arguments for asynchronous multi thread processing
processed_data = imthread.start(my_func, repeat=20, max_threads=20)
#printing the synchronised received results
print()
print(f'>> Result: {processed_data}')
imthread.elapsed(output=True)
output
>> Creating Threads 1
>> Creating Threads 2
>> Creating Threads 3
>> Creating Threads 4
>> Creating Threads 5
>> Creating Threads 6
>> Creating Threads 7
>> Creating Threads 8
>> Creating Threads 9
>> Creating Threads 10
>> Creating Threads 11
>> Creating Threads 12
>> Creating Threads 13
>> Creating Threads 14
>> Creating Threads 15
>> Creating Threads 16
>> Creating Threads 17
>> Creating Threads 18
>> Creating Threads 19
>> Creating Threads 20
>> Result: [<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
>> Elapsed time: 0.55 sec
Hi,
I use process, by decorator:
@concurrent.process(timeout=60*60)
but sometimes, function in not started, while process is active.
I added some prints to code, and it stuck in
def _function_handler(function, args, kwargs, pipe):
"""Runs the actual function in separate process and returns its result."""
print(some_print)
so when task is stuck, print(some_print)
is not printed
For my app I can avoid this by add another thread to check stuck tasks, but I want to understand, why its going.
Thanks, and sorry for bad English
I am running on a mac and I get an exception running this code
jobs = []
with ProcessPool( max_workers = workers ) as pool:
# background is the task that runs and the parameter to it follow
for q in query:
job = pool.schedule( background, args=( q, recursion, action, update ) )
job.add_done_callback( finished )
jobs.append(job)
File "/Users/Dropbox/Work/adops/migrate/lib/python2.7/site-packages/pebble/pool/channel.py", line 44, in unix_poll
return bool(select([self.reader], [], [], timeout)[0])
error: (9, 'Bad file descriptor')
I was trying to call external non-trusted subprocesses with the help of pebble. I was modelling it with children classes having two methods with the @Concurrent decorator (one in each child method that each one call a different subprocess with subprocess.run or http request, due a children uses a local library and the other one uses a api library). The problem is that the script is calling the method from the non-expected children (after reviewing twice the spelling).
I have reduced the code to a simple version in order to reproduce the problem.
test_peeble_inheritance.zip
If I call the attached script with the "-f apple -a eat" parameters I could get a stdout similar to this one (the original code is not really using prints and it is calling external processes) :
Fruit Init
Apple Init
Cherry Eat
My tool uses both a thread pool and pebble. At some point, when I increase the thread count for the pool and run the tool I get:
Exception in thread Thread-5:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 754, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/pedro/pch/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/process.py", line 171, in message_manager_loop
pool_manager.process_next_message(SLEEP_UNIT)
File "/home/pedro/pch/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/process.py", line 200, in process_next_message
message = self.worker_manager.receive(timeout)
File "/home/pedro/pch/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/process.py", line 321, in receive
return self.pool_channel.recv()
File "/home/pedro/pch/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/channel.py", line 60, in recv
return self.reader.recv()
EOFError
Any ideas what could be going on?
PS: Sorry to bug you with all these issues, but lately I've not been lucky with pebble / threads.
Hi Matteo,
I left for the end the most complex test case.
The code below may seem a bit long...
What we may notice is that I launch a ThreadPool with 10000 tasks in it.
At a certaint time, I raise an exception : either in the callback function, either directly in the function launched by the task, in order to test the pool exit.
The pool is then stopped doing so :
_POOL.stop()
_POOL.join()
see pool_stop
function.
Also, the behaviour is different, wether I have a waitting time in the called function, or not.
One last thing I ommitted to say : the infinite loop occurs in the _wait_queue_depletion function
:
File "/usr/local/lib/python2.7/site-packages/pebble/pool/base_pool.py", line 42, in __exit__
self.join()
File "/usr/local/lib/python2.7/site-packages/pebble/pool/base_pool.py", line 71, in join
self._wait_queue_depletion(timeout)
File "/usr/local/lib/python2.7/site-packages/pebble/pool/base_pool.py", line 84, in _wait_queue_depletion
time.sleep(SLEEP_UNIT)
KeyboardInterrupt
You should copy/paste the code below and run it as it is.
I hope, with these explanations, it will be OK !
Thank you very much again for your help and all my apologies for this poor piece of code... !
Regards,
Philippe
PS : When the test is OK, (see below), when I add a waitting time, it seems because I have an other exception in the library : list.remove(x): x not in list
This may cause the interruption of the program so we don't run into an infinite loop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
from datetime import date, datetime, timedelta
import time, random
import threading
from threading import current_thread
from pebble import ThreadPool
from pebble import ProcessPool
def init_pool():
global _POOL_RUNNING
print "%s - New created worker - _POOL_RUNNING = %s" % (datetime.now(), _POOL_RUNNING)
return
def pool_stop(cpt):
global _POOL
global _LOCK
global _STOP
print "Call fonction pool_stop - cpt = %s" % cpt
with _LOCK:
if not _STOP:
_STOP = True
#_POOL.close()
print "Call _POOL.stop() ... : _POOL.active = %s" % _POOL.active
_POOL.stop()
print "Call _POOL.join() ... : _POOL.active = %s" % _POOL.active
_POOL.join(0)
else:
print "_POOL has been stoped - cpt = %s" % cpt
print "Exit function pool_stop - cpt = %s" % cpt
return
def work_todo(cpt, a, b, mode = 'callback', max_hold_time = None, test_exit = False, log = False):
global _STOP
global _LOCK
global _POOL
res1 = None
res2 = None
wait_time = None
if not _STOP:
try:
res1 = a + b
res2 = a * b
if max_hold_time:
wait_time = random.randint(1, max_hold_time)
print "%s - %s/%s - Wait %ss..." % (cpt, a, b, wait_time)
time.sleep(wait_time)
print "%s - %s/%s - End of wait." % (cpt, a, b)
if not mode and log:
print "{0}- {1}/{2} - {3} --> {4} secondes -- ({1} + {2}) = {5} ◊◊◊ ({1} * {2}) = {6}".format(
cpt, a, b, current_thread().ident, wait_time, res1, res2)
if test_exit and (not mode or mode == 'err_caller') and b != 0 and b % random.randint(5, 10) == 0:
# Throw exception to test pool exit :
msg = "cpt = %s - exception created in work_done to test pool exit" % cpt
raise RuntimeError(msg)
except Exception as erreur:
print "cpt = %s - ERROR IN work_todo >>>>>>>>>>>>>>>>>>> %s" % (cpt, erreur)
pool_stop(cpt)
else:
print "cpt = %s - POOL has been stopped !" % cpt
return mode, test_exit, max_hold_time, cpt, wait_time, log, a, b, res1, res2, current_thread().ident
def work_done(task):
global _POOL
global _LOCK
global _STOP
cpt = None
try:
mode, test_exit, max_hold_time, cpt, wait_time, log, a, b, res1, res2, thread_id = task.result()
if log:
print "DONE -> {0} - {1} - {2}/{3} - {4} --> {5} secondes -- POOL : {6}/{7} -- ({2} + {3}) = {8} ◊◊◊ ({2} * {3}) = {9}".format(
datetime.now(), cpt, a, b, thread_id, wait_time, _POOL, _POOL.active, res1, res2)
if mode == 'callback':
if test_exit and b != 0 and b % random.randint(5, 10) == 0:
msg = "cpt = %s - exception created in work_done to test pool exit" % cpt
raise RuntimeError(msg)
except Exception as erreur:
print "cpt = %s - ERROR IN work_done >>>>>>>>>>>>>>>>>>> %s" % (cpt, erreur)
pool_stop(cpt)
return
def test_thread_pool(maxi, mode, max_hold_time, test_exit=False):
global _STOP
global _POOL
global _POOL_RUNNING
_STOP = False
trace_fct = "test_thread_pool(maxi=%s, mode=\"%s\", max_hold_time=%s, test_exit=%s) : %s tasks ..." % (maxi, mode, max_hold_time, test_exit, maxi*maxi)
print "\n%s - BEGIN %s" % (datetime.now(), trace_fct)
task = []
try:
_POOL_RUNNING = False
with ThreadPool(max_workers = 5, max_tasks = 5, initializer = init_pool) as _POOL:
cpt = 0
for i in range(0, maxi):
for j in range(0, maxi):
cpt += 1
if _STOP:
print "In test_thread_pool : cpt = %s - stop loading of POOL : POOL has been stopped !" % cpt
break
tache = _POOL.schedule(work_todo, args = (cpt, i, j, mode, max_hold_time, test_exit, False))
if mode in ['callback', 'err_caller']:
tache.add_done_callback(work_done)
task.append(tache)
if _STOP:
break
print "%s - Pool LOADED !!!!" % datetime.now()
_POOL_RUNNING = True
except Exception as erreur:
print " ERROR IN test_thread_pool >>>>>>>>>>>>>>>>>>> ", erreur if erreur.message else type(erreur)
print "\n%s - END %s" % (datetime.now(), trace_fct)
return
# GLOBALS :
_STOP = False
_LOCK = threading.Semaphore()
_POOL = None
_POOL_RUNNING = False
# ------------- #
# Fonction main #
# ------------- #
def main(argv):
maxi = 10
# OK :
#test_thread_pool(maxi=5, mode='callback', max_hold_time=0, test_exit=True)
# NOK :
test_thread_pool(maxi=100, mode='callback', max_hold_time=0, test_exit=True)
# OK :
#test_thread_pool(maxi=100, mode='callback', max_hold_time=5, test_exit=True)
# NOK :
#test_thread_pool(maxi=10, mode='err_caller', max_hold_time=0, test_exit=True)
# NOK :
test_thread_pool(maxi=10, mode='', max_hold_time=0, test_exit=True)
# OK :
#test_thread_pool(maxi=10, mode='', max_hold_time=0, test_exit=True)
return
if __name__ == "__main__":
main(sys.argv[1:])
Hi,
I really appreciate the contributions of pebble, it supports timeout in multiprocessing. I'm inspired by this answer https://stackoverflow.com/questions/44402085/multiprocessing-map-over-list-killing-processes-that-stall-above-timeout-limi. I found out threadpool doesn't have this functionality as I test the functionality.
I'd like to know what's the reason behind it and do you have future plan to support it?
The test code:
from pebble import ProcessPool
from concurrent.futures import TimeoutError
def fibonacci(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fibonacci(n - 1) + fibonacci(n - 2)
with ProcessPool() as pool:
future = pool.map(fibonacci, range(50), timeout=10)
try:
for n in future.result():
print(n)
except TimeoutError:
print("TimeoutError: aborting remaining computations")
future.cancel()
I am using pebble==4.3.10 on windows 10 with Python 3.6.8, and the test code above raised error:
......
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
def task_done(future):
try:
print(future.result())
except TimeoutError as error:
logging.error("Function took longer than {} seconds".format(error.args[1]))
with ProcessPool(max_workers=args.workers, initializer=proc_init, initargs=(fail_writer,sw_writer,hw_writer,lks,swfh,hwfh,failfh,compile_re,uname,pwd,r_hash)) as p:
future = p.schedule(getAcrDeviceData, args=[devices[0]], timeout=20)
future.add_done_callback(task_done)
The above works as expected i.e I am able to get the "Function took longer than 20 seconds" but if I use map instead (devices is the iterable), the future returned has the timeout exception but in the task_done the except TimeoutError block does not execute. Am I using map correctly?
with ProcessPool(max_workers=args.workers, initializer=proc_init, initargs=(fail_writer,sw_writer,hw_writer,lks,swfh,hwfh,failfh,compile_re,uname,pwd,r_hash)) as p:
future = p.map(getAcrDeviceData, devices, timeout=20)
future.add_done_callback(task_done)
Hi there,
I've been using pebble for a while, mainly for its timeout capabilities compared to plain concurrent.futures.
Lately I've run into situations where the following runtime error is raised:
File "/infinite/venus/src/task_scheduler/process_scheduler.py", line 231, in _schedule
return self._executor.schedule(start_job, kwargs={"job": job}, timeout=self._timeout)
File "/usr/local/lib/python3.7/site-packages/pebble/pool/process.py", line 85, in schedule
self._check_pool_state()
File "/usr/local/lib/python3.7/site-packages/pebble/pool/base_pool.py", line 94, in _check_pool_state
raise RuntimeError('Unexpected error within the Pool')
RuntimeError: Unexpected error within the Pool
I could not find a consistent scenario to trigger it. Sometimes it happens after 1700 jobs have already been scheduled (all of the same type), sometimes everything goes fine even with more than 32000 jobs scheduled.
I'm a bit blind on what could go wrong, I'd appreciate some pointers as to where I should look for... What are the situations that are likely to trigger this ?
Thanks :)
I was testing timeouts on a Win10 machine (pebble version 4.3.9, Python 3.6.6 from conda) trying to collect and print all of the answers that didn't timeout.
from concurrent.futures import TimeoutError
from pebble import ProcessPool
def fibonacci(n):
if n == 0: return 0
elif n == 1: return 1
else: return fibonacci(n - 1) + fibonacci(n - 2)
def main():
with ProcessPool() as pool:
future = pool.map(fibonacci, range(40), timeout=10)
iterator = future.result()
all = []
while True:
try:
all.append(next(iterator))
except StopIteration:
break
except TimeoutError as e:
print(f'function took longer than {e.args[1]} seconds')
print(all)
It seems to be working but it gives me a very dire error:
[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578]
RuntimeError: I/O operations still in flight while destroying Overlapped object, the process may crash
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\anaconda3\lib\multiprocessing\spawn.py", line 99, in spawn_main
new_handle = reduction.steal_handle(parent_pid, pipe_handle)
File "C:\anaconda3\lib\multiprocessing\reduction.py", line 87, in steal_handle
_winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
PermissionError: [WinError 5] Access is denied
Interestingly, part of the error happens even when range(40)
is replaced by range(10)
but only the RuntimeError remains and weirdly there is no traceback given.
RuntimeError: I/O operations still in flight while destroying Overlapped object, the process may crash
I'm having issues using Pebble's ProcessPool with gRPC (described here grpc/grpc#18342) and a gRPC contributor suggested that it may be due to the processes in ProcessPool not being pre-forked (his comment is here grpc/grpc#18342 (comment)). Any thoughts on this and if pre-forking can be added as an option? gRPC requires any subprocesses to be forked before the gRPC server starts.
Can I pass task-related information to add_done_callback? I want to print out the task name.
In python multiprocessing Manager().Queue() has the function qsize() which returns the number of processes completed or running.
Is there a way of logging how far through all the processes I am with pebble?
Hi all, I encountered what seems to me a weird behavior using ProcessPool
.
I need to run some experiments in parallel which use random sampled data every time at the beginning, I observed that the sampled data begin always in the same way as if you were to give a fixed seed.
Here a toy example to reproduce the behavior:
from numpy.random import uniform
import pebble
def power2(c):
x = uniform()
return x**2, c**2
if __name__ == '__main__':
values = [2, 4, 6, 8]
with pebble.ProcessPool() as p:
res = p.map(power2, values)
print(list(res.result()))
which prints:
[(0.1495687030442891, 4), (0.1495687030442891, 16), (0.1495687030442891, 36), (0.1495687030442891, 64)]
As you can notice the first number is always the same...
using ThreadPool
instead of ProcessPool
this behavior does not show up
is this a bug or what am I missing?
thx for your help
I would see it handy to be able to selectively stop
a ProcessFuture
.
Right now, one can only stop all of them with pool.stop
. Note that I speak about processes where stop
means abruptly.
Thanks!
pebble has a rather unusual setup.py, which modifes the sources during the build, even when calling setup.py clean. Seen when packaging pebble. Maybe make this more robust, such that things are not modified, or not appended multiple times? My current workaround is not to call write_version at all.
--- python-pebble-4.5.1.orig/setup.py
+++ python-pebble-4.5.1/setup.py
@@ -17,7 +17,7 @@ def package_version():
version = read_version(version_path)
write_version(version_path, version)
#write_version(init_path, version, mode='a')
return version
While running some scans using w3af, which uses pebble
for parsing HTML documents I get MemoryError
exceptions:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/eth/tools/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/process.py", line 383, in worker_process
for task in worker_get_next_task(channel, params.max_tasks):
File "/home/eth/tools/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/process.py", line 398, in worker_get_next_task
yield fetch_task(channel)
File "/home/eth/tools/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/process.py", line 404, in fetch_task
return task_transaction(channel)
File "/home/eth/tools/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/process.py", line 413, in task_transaction
task = channel.recv()
File "/home/eth/tools/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/channel.py", line 90, in recv
return self.reader.recv()
MemoryError
Any ideas what these could be? My research leads me to believe that this could be because of large HTML files being sent through the Pipe
, but I was unable to confirm it.
Also, memory usage shouldn't be an issue, I have 64G of RAM and 64G of swap.
I am working a test-case reduction tool (https://github.com/marxin/cvise) where I create ProcessPool
s and I commonly cancel
process futures. I noticed that decreasing SLEEP_UNIT
from 0.1
to 0.01
can speed up my workload:
From:
...
00:00:52 INFO (98.7%, 2934 bytes)
00:00:52 INFO (98.7%, 2917 bytes)
00:00:53 INFO (98.7%, 2899 bytes)
00:00:53 INFO (98.7%, 2870 bytes)
00:00:53 INFO (98.7%, 2833 bytes)
00:00:53 INFO (98.8%, 2744 bytes)
00:00:53 INFO ===< LinesPass::1 >===
To:
...
00:00:45 INFO (98.7%, 2870 bytes)
00:00:45 INFO (98.7%, 2833 bytes)
00:00:45 INFO (98.8%, 2744 bytes)
00:00:45 INFO ===< LinesPass::1 >===
Would it be possible to decrease the unit or have a parameter that can a library consumer control?
Thanks for working on the library!
Hey @noxdafox - I noticed that the remote traceback change I made wasn't included in 4.3.5 but is on master, would it be possible to create a release with that change included?
With release 4.2.1 on Mac OSX : There is an error running your example on http://pythonhosted.org/Pebble/#examples
At the very beginning on the line
from concurrent.futures import Timeouterror
this throws ImportError error : cannot import name Timeouterror
Thank you in advance.
I'm trying to run a function in a process pool with timing out after 5 seconds (in an async environment).
with pebble.ProcessPool(1) as pool:
future = pool.schedule(func, args=[foo], timeout=5)
async_future = loop.run_in_executor(None, future.result, 5)
result = await asyncio.wait_for(async_future, 5)
On Windows this works like a charm. But on Linux (I have tried it with Debian and Ubuntu) the main process (the whole application) gets killed – indifferent of the function taking more or less than 5 seconds. There is no output or error message when the process exits. Is this a bug or am I missing something?
Pebble
Version: 4.5.3
Ubuntu 18.04.2 LTS
Debian GNU/Linux 9.12
I'm using pebble process pool to run python code which I don't fully trust / control. This code is parsing HTML documents and for some specific HTML documents it segfaults, enters and endless loop or consumes a lot of memory.
Pebble allows me to protect my main process from segfaults, and by setting a timeout to the parsing process I'm also protected against endless loops.
What is missing for me now is the case where the parser consumes a lot of memory. My ideal scenario would be one where I could tell pebble to kill the process if its memory usage exceeds X number of MB.
There seem to be ways of doing this in regular processes (not related with pebble):
But before implementing this into my code I was wondering if @noxdafox had experience doing something like this, ideas, tips, etc. and if you would be interested in me implementing this idea in pebble instead of my code / pebble subclass.
Hi,
I'm currently running Pebble 4.4.0 and to get the updated pebble I typed "pip install pebble --upgrade" into a cmd.exe with admin privileges. I got an error, which I have written below.
C:\Windows>pip install pebble --upgrade
Collecting pebble
Using cached https://files.pythonhosted.org/packages/5e/a3/16f98c868854a6916d68bb6ea02edcf7c91021ad41892862f1307c6831df/Pebble-4.4.1.tar.gz
Complete output from command python setup.py egg_info:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\jjsag\AppData\Local\Temp\pip-build-cgvge9xr\pebble\setup.py", line 39, in <module>
version="{}".format(package_version()),
File "C:\Users\jjsag\AppData\Local\Temp\pip-build-cgvge9xr\pebble\setup.py", line 14, in package_version
version = read_version(version_path)
File "C:\Users\jjsag\AppData\Local\Temp\pip-build-cgvge9xr\pebble\setup.py", line 22, in read_version
return subprocess.check_output(('git', 'describe')).rstrip().decode()
File "c:\users\jjsag\appdata\local\programs\thonny\lib\subprocess.py", line 316, in check_output
**kwargs).stdout
File "c:\users\jjsag\appdata\local\programs\thonny\lib\subprocess.py", line 383, in run
with Popen(*popenargs, **kwargs) as process:
File "c:\users\jjsag\appdata\local\programs\thonny\lib\subprocess.py", line 676, in __init__
restore_signals, start_new_session)
File "c:\users\jjsag\appdata\local\programs\thonny\lib\subprocess.py", line 955, in _execute_child
startupinfo)
FileNotFoundError: [WinError 2] The system cannot find the file specified
----------------------------------------`
I see thonny is in the stack near the bottom, it is my IDE I use to help me with my python, but it has had no issues with any other modules before.
Thanks for reading this, and any suggestions for help will be appreciated.
OS: Windows 10 X64
Python Version: 3.5.4 from Anaconda
Example:
from concurrent.futures import TimeoutError
from pebble import ProcessPool, ProcessExpired
def function(n):
return n
with ProcessPool() as pool:
future = pool.map(function, range(100), timeout=10)
iterator = future.result()
while True:
try:
result = next(iterator)
except StopIteration:
break
except TimeoutError as error:
print("function took longer than %d seconds" % error.args[1])
except ProcessExpired as error:
print("%s. Exit code: %d" % (error, error.exitcode))
except Exception as error:
print("function raised %s" % error)
print(error.traceback) # Python's traceback of remote process
Error: AttributeError: Can't get attribute 'function' on <module '__main__' (built-in)>
Stack Trace for one of the processes:
Process Process-3:
File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\multiprocessing\process.py", line 252, in _bootstrap
self.run()
File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\process.py", line 383, in worker_process
for task in worker_get_next_task(channel, params.max_tasks):
File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\multiprocessing\process.py", line 252, in _bootstrap
self.run()
File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\process.py", line 398, in worker_get_next_task
yield fetch_task(channel)
File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\process.py", line 404, in fetch_task
return task_transaction(channel)
File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\process.py", line 383, in worker_process
for task in worker_get_next_task(channel, params.max_tasks):
File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\process.py", line 413, in task_transaction
File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\process.py", line 398, in worker_get_next_task
yield fetch_task(channel)
task = channel.recv()
File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\process.py", line 404, in fetch_task
return task_transaction(channel)
File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\channel.py", line 90, in recv
return self.reader.recv()
File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\process.py", line 413, in task_transaction
task = channel.recv()
File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\multiprocessing\connection.py", line 251, in recv
return ForkingPickler.loads(buf.getbuffer())
File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\channel.py", line 90, in recv
return self.reader.recv()
AttributeError: Can't get attribute 'function' on <module '__main__' (built-in)>
File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\multiprocessing\connection.py", line 251, in recv
return ForkingPickler.loads(buf.getbuffer())
AttributeError: Can't get attribute 'function' on <module '__main__' (built-in)>
What my end game of using pebble will be (not necessarily from example):
Map a list of arguments to function call using a multiprocessing pool. Have a global timeout. If time runs out, the returns from the function calls (though incomplete) are still valuable and should be saved and further processed in the main process.
Thanks for your help and this great package.
Based on the example on README, let say we have this:-
from pebble import ProcessPool
from concurrent.futures import TimeoutError
def execute_task(task):
# work on task
...
task.status = 'done'
task.save()
def task_done(future):
try:
result = future.result() # blocks until results are ready
except TimeoutError as error:
print("Function took longer than %d seconds" % error.args[1])
except Exception as error:
print("Function raised %s" % error)
print(error.traceback) # traceback of the function
with ProcessPool(max_workers=5, max_tasks=10) as pool:
for task in get_tasks():
future = pool.schedule(execute_task, args=[task], timeout=3)
future.add_done_callback(task_done)
So in case of time out for example, we may want to set task.status = 'failed
but we don't have any reference to task from task_done()
function. One workaround I can think of is with closure:-
def task_done(future, task):
try:
result = future.result() # blocks until results are ready
except TimeoutError as error:
print("Function took longer than %d seconds" % error.args[1])
task.status = 'failed'
task.save()
except Exception as error:
print("Function raised %s" % error)
print(error.traceback) # traceback of the function
with ProcessPool(max_workers=5, max_tasks=10) as pool:
for task in get_tasks():
def task_done_wrapper(future):
task_done(future, task)
future = pool.schedule(execute_task, args=[task], timeout=3)
future.add_done_callback(task_done_wrapper)
But is this a viable solution?
Hello,
Thanks for the module. I am trying to make use of the Pipe connection object to send data between two processes, but it seems when using the Process, I am not able to send the connection Pipe as an arg.
So please how can I do this using this module?
Regards
I'm using Pebble 4.41 with python3.7 on mac Mojave.
A project of mine hangs when using ProcessPool.join().
I tried the test case in the repo test_process_pool_close_stopped
, and it hangs as well.
I run a pool.map on 300 millions entries. Running locally it works, on the linux cluster it fails with this exception
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.6/dist-packages/pebble/pool/process.py", line 385, in worker_process
for task in worker_get_next_task(channel, params.max_tasks):
File "/usr/local/lib/python3.6/dist-packages/pebble/pool/process.py", line 400, in worker_get_next_task
yield fetch_task(channel)
File "/usr/local/lib/python3.6/dist-packages/pebble/pool/process.py", line 404, in fetch_task
while channel.poll():
File "/usr/local/lib/python3.6/dist-packages/pebble/pool/channel.py", line 46, in unix_poll
return bool(select.select([self.reader], [], [], timeout)[0])
ValueError: filedescriptor out of range in select()
Is it possible to switch out the select.select() in the channel.py with a select.poll()
Thanks for any help pointing out fixing this error
Hello again.
An other issue with the example : (4.2.1 and moc osx 10.12.5).
I cannot go threw the example on https://pypi.python.org/pypi/Pebble, see code below
def function(foo, bar=0):
return foo + bar
def task_done(future):
try:
result = future.result() # blocks until results are ready
except Exception as error:
print("Function raised %s" % error)
print(error.traceback) # traceback of the function
except TimeoutError as error:
print("Function took longer than %d seconds" % error.args[1])
with ProcessPool(max_workers=5, max_tasks=10) as pool:
for i in range(0, 10):
future = pool.schedule(function, args=[i], timeout=3)
future.add_done_callback(task_done)
It gives at the end of exectution :
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 754, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/local/lib/python2.7/site-packages/pebble/pool/process.py", line 155, in message_manager_loop
pool_manager.process_next_message(SLEEP_UNIT)
File "/usr/local/lib/python2.7/site-packages/pebble/pool/process.py", line 184, in process_next_message
message = self.worker_manager.receive(timeout)
File "/usr/local/lib/python2.7/site-packages/pebble/pool/process.py", line 304, in receive
if self.pool_channel.poll(timeout):
File "/usr/local/lib/python2.7/site-packages/pebble/pool/channel.py", line 44, in unix_poll
return bool(select([self.reader], [], [], timeout)[0])
error: (9, 'Bad file descriptor')
Thank you again in advance for youy help ! :-)
Hi,
While working with your great library I have faced this problem -- one
of my process that was scheduled using the pool raised this exception:
Daemonic processes are not allowed to have children
Do you know if there is a fix for this?
It took me more than hour to realize what's wrong:
#!/usr/bin/env python3
from pebble import ProcessPool
class Tester:
def __init__(self):
self.futures = []
def run_one(self, v):
time.sleep(1)
def run(self):
while True:
print('new ProcessPool')
with ProcessPool(max_workers=16) as pool:
f = pool.schedule(self.run_one, [1])
self.futures.append(f)
f.result()
Tester().run()
$ ./sample.py
new ProcessPool
Traceback (most recent call last):
File "/tmp/pe.py", line 20, in <module>
Tester().run()
File "/tmp/pe.py", line 18, in run
f.result()
File "/usr/lib64/python3.8/concurrent/futures/_base.py", line 439, in result
return self.__get_result()
File "/usr/lib64/python3.8/concurrent/futures/_base.py", line 388, in __get_result
raise self._exception
File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 204, in schedule
self.worker_manager.dispatch(task)
File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 335, in dispatch
raise error
File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 333, in dispatch
self.pool_channel.send(WorkerTask(task.id, task.payload))
File "/usr/lib/python3.8/site-packages/pebble/pool/channel.py", line 66, in send
return self.writer.send(obj)
File "/usr/lib64/python3.8/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib64/python3.8/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.RLock' object
The problem is that pickle is called for my Tester
which contains a reference to pebble.ProcessFuture
.
First, I would like to really thank for the library. I need to create a process pool where I need capability to immediately terminate running tasks. I can't understand why the official concurrent.futures
lacks the ability.
However, I see various exceptions when I do pool.stop()
:
https://github.com/marxin/creduce/blob/threadpool/creduce/utils/testing.py#L317-L319
00:00:01 INFO ===< ClangBinarySearchPass::replace-function-def-with-decl >===
Exception in thread Thread-18:
Traceback (most recent call last):
File "/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/lib64/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 178, in message_manager_loop
pool_manager.process_next_message(SLEEP_UNIT)
File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 210, in process_next_message
message = self.worker_manager.receive(timeout)
File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 342, in receive
return self.pool_channel.recv()
File "/usr/lib/python3.8/site-packages/pebble/pool/channel.py", line 63, in recv
return self.reader.recv()
File "/usr/lib64/python3.8/multiprocessing/connection.py", line 250, in recv
buf = self._recv_bytes()
File "/usr/lib64/python3.8/multiprocessing/connection.py", line 421, in _recv_bytes
return self._recv(size)
File "/usr/lib64/python3.8/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
...
Traceback (most recent call last):
File "/tmp/bin/creduce/usr/local/bin/creduce", line 204, in <module>
reducer.reduce(pass_group, skip_initial=args.skip_initial_passes)
File "/tmp/bin/creduce/usr/local/bin/../share/creduce/creduce.py", line 118, in reduce
self._run_additional_passes(pass_group["first"])
File "/tmp/bin/creduce/usr/local/bin/../share/creduce/creduce.py", line 145, in _run_additional_passes
self.test_manager.run_pass(p)
File "/tmp/bin/creduce/usr/local/bin/../share/creduce/utils/testing.py", line 384, in run_pass
parallel_tests = self.run_parallel_tests()
File "/tmp/bin/creduce/usr/local/bin/../share/creduce/utils/testing.py", line 319, in run_parallel_tests
pool.join()
File "/usr/lib/python3.8/site-packages/pebble/pool/base_pool.py", line 77, in join
self._stop_pool()
File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 75, in _stop_pool
loop.join()
File "/usr/lib64/python3.8/threading.py", line 1011, in join
self._wait_for_tstate_lock()
File "/usr/lib64/python3.8/threading.py", line 1027, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
Can you please help me what can I do wrong?
my code
def task_done(iterator):
while True:
try:
result = next(iterator)
except StopIteration:
break
except TimeoutError as error:
print("function took longer than %d seconds" % error.args[1])
except ProcessExpired as error:
print("%s. Exit code: %d" % (error, error.exitcode))
except Exception as error:
print("function raised %s" % error)
traceback.print_exc() # Python's traceback of remote process
with ProcessPool(max_workers=PROCESS_POOL_SIZE, max_tasks=10) as pool:
future = pool.map(crawl_func, summoners, timeout=0)
iterator = future.result()
task_done(iterator)
errors
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "c:\python27\Lib\multiprocessing\forking.py", line 381, in main
self = load(from_parent)
File "c:\python27\Lib\pickle.py", line 1384, in load
return Unpickler(file).load()
File "c:\python27\Lib\pickle.py", line 864, in load
dispatch[key](self)
File "c:\python27\Lib\pickle.py", line 886, in load_eof
raise EOFError
EOFError
python2.7 on win10 please help me
Package Versions:
Pebble version: v4.4.1
Environment Versions:
Docker Image: python:3.6-alpine
Gitlab Runner Version: gitlab-runner 11.10.1
Pebble installs on docker image has occurred in 4.4.0.
Collecting pebble==4.4.1
Downloading https://files.pythonhosted.org/packages/5e/a3/16f98c868854a6916d68bb6ea02edcf7c91021ad41892862f1307c6831df/Pebble-4.4.1.tar.gz
ERROR: Command errored out with exit status 1:
command: /usr/local/bin/python -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-xx5yp0kw/pebble/setup.py'"'"'; __file__='"'"'/tmp/pip-install-xx5yp0kw/pebble/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-install-xx5yp0kw/pebble/pip-egg-info
cwd: /tmp/pip-install-xx5yp0kw/pebble/
Complete output (17 lines):
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/tmp/pip-install-xx5yp0kw/pebble/setup.py", line 39, in <module>
version="{}".format(package_version()),
File "/tmp/pip-install-xx5yp0kw/pebble/setup.py", line 14, in package_version
version = read_version(version_path)
File "/tmp/pip-install-xx5yp0kw/pebble/setup.py", line 22, in read_version
return subprocess.check_output(('git', 'describe')).rstrip().decode()
File "/usr/local/lib/python3.6/subprocess.py", line 356, in check_output
**kwargs).stdout
File "/usr/local/lib/python3.6/subprocess.py", line 423, in run
with Popen(*popenargs, **kwargs) as process:
File "/usr/local/lib/python3.6/subprocess.py", line 729, in __init__
restore_signals, start_new_session)
File "/usr/local/lib/python3.6/subprocess.py", line 1364, in _execute_child
raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'git': 'git'
----------------------------------------
ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.
(some may be extraneous)
docker pull python:3.6-alpine && docker run -it python:3.6-alpine sh
pip install pebble==4.4.1
The sdist tarball on PyPI doesn't contain the LICENSE file.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.