Comments (5)
Hello,
the chunksize
parameter is intended to reduce the IPC overhead when dealing with large datasets. This is particularly useful if the dataset is consisting of lots of small elements (ex: array of float
). Sending them one-by-one would become a major bottleneck.
When chunksize
is greater than 1, the worker process will process the allotted chunk altogether. In practice, this means it will receive the chunk, process it all and return it back to the main process. Thus minimizing the overhead caused by IPC.
If one of the elements of the chunk causes the worker process to crash or to hang until the timeout occurs, the whole chunk is lost. The reason for this is simple: crashes and timeouts are handled on the main loop which cannot understand what went wrong within the affected worker process.
If you want your logic to pinpoint the offending item within your collection, you can rely on the fact that the returned ProcessMapFuture
yields the chunks in the order of submission. Hence, you can re-sumbit the offending chunk(s) with a chunksize
of 1 and identify the problematic item in your collection.
from pebble.
Thanks a lot for your reply! Given that information, shouldn't something like this work to ensure the order and length of the output is kept the same?
with ProcessPool(max_workers=num_workers) as pool:
future = pool.map(
computeMetric, inputList, timeout=5, chunksize=10
)
iterator = future.result()
while True:
try:
result = next(iterator)
outputList.append(result)
except StopIteration:
break
except TimeoutError:
timeoutCounter += 1
outputList += [np.nan] * chunksize
except Exception:
errorCounter += 1
outputList += [np.nan] * chunksize
When I use this in some of my tests, the output length changes, but shouldn't it be the same if a chunk fails as a whole, and I add [np.nan]*chunksize
to my final output list? Maybe something in my implementation is wrong?
In my specific case, my computeMetric function is from the spyrmsd package. However I will try to make an easier workable example.
from pebble.
Hi,
Here I have a minimal working example. I just selected random numbers which would time out or produce another error:
from pebble import ProcessPool
from concurrent.futures import TimeoutError
import numpy as np
import time
num_workers=1
timeout=1
chunksize=4
def processFunction(inputNumber):
if inputNumber == 23:
time.sleep(5)
elif inputNumber == 42:
raise ValueError
elif inputNumber == 86:
time.sleep(5)
elif inputNumber == 98:
raise ValueError
return inputNumber
timeoutCounter = 0
errorCounter = 0
with ProcessPool(max_workers=num_workers) as pool:
outputList = []
future = pool.map(processFunction, range(100), timeout=timeout, chunksize=chunksize)
iterator = future.result()
while True:
try:
result = next(iterator)
outputList.append(result)
except StopIteration:
break
except TimeoutError:
timeoutCounter += 1
outputList += [np.nan]*chunksize
except Exception:
errorCounter += 1
outputList += [np.nan]*chunksize
if timeoutCounter + errorCounter > 0:
# Calculate total number of np.nan
failedCompoundsTotal = np.count_nonzero(np.isnan(outputList))
print(f"{failedCompoundsTotal} compounds failed in total. {timeoutCounter} chunks (up to {timeoutCounter * chunksize} compounds) timed out and were skipped, {errorCounter} chunks raised an error")
print(len(outputList),outputList)
If you run this code example, you'll find that too many np.nan
are inserted when a non-timeout error happens. So it does seem to me that if a single (non-timeout) error happens in a chunk, the rest can somehow still be processed correctly? A timeout error however does make the whole chunk fail, no matter what did or didn't process successfully. It seems like when a regular error occurs, it's better to only 1 np.nan instead:
except Exception:
errorCounter += 1
outputList += [np.nan]
I made these changes in my more specific example, but it seems that somehow still unexpected behaviour is taking place (where the final length has a different length than the input). I'm still investigating this in more detail however. I will try to update if I find anything, otherwise I will just put a check to on the chunksize in the final version of my method.
from pebble.
Normal errors are returned as the worker can intercept them and pass them back. As I mentioned above, timeout and crashes will lead to the loss of the whole batch. With crash I do not mean a Python exception, I mean an actual crash such as a segmentation fault or a OOM.
The following example shows what I've meant.
We pass 10 elements with a chunksize of 2:
- If element is equal to
1
, we raise an exception. In this case the whole batch is processed successfully and element1
will be replaced with the raised exception. - If element is equal to
3
, we simulate a timeout. In this case, the whole batch is lost and replaced by aTimeoutError
. - If element is equal to
7
, we simulate a crash similar to a segmentation fault. These errors are rare and typically show up if using faulty C libraries or if we run out of memory. The whole batch is lost again and aProcessExpired
error is raised.
import os
import time
from concurrent.futures import TimeoutError
from pebble import ProcessPool, ProcessExpired
def function(value):
if value == 1:
raise RuntimeError('BOOM!')
if value == 3:
time.sleep(5)
if value == 7:
os._exit(1) # Simulate a crash such as a segfault
return value
with ProcessPool(max_workers=1) as pool:
processed = []
future = pool.map(function, range(10), timeout=1, chunksize=2)
iterator = future.result()
while True:
try:
result = next(iterator)
processed.append(result)
except StopIteration:
break
except TimeoutError as error:
processed.append(error)
except ProcessExpired as error:
processed.append(error)
except Exception as error:
processed.append(error)
print(processed)
Output:
[0, RuntimeError('BOOM!'), TimeoutError(), 4, 5, ProcessExpired('Abnormal termination'), 8, 9]
from pebble.
Thanks a lot for the clarification, this makes more sense now!
from pebble.
Related Issues (20)
- The timeout argument of ProcessPool().submit() is inconsistent with ThreadPool().submit() HOT 3
- Bug: new `submit` function makes it impossible to call a function that has a `timeout` argument HOT 6
- Logging process name inside concurrent.process HOT 2
- How to handle errors when using pool.schedule HOT 1
- shutdown of main program HOT 2
- @concurrent.process returned future blocks/hangs on running(), cancelled(), done() calls HOT 2
- Get information about broken process HOT 4
- Documentation for Pebble indicates threads created with a ThreadPool are cancellable HOT 1
- Channel mutex timeout HOT 6
- How can I use a multiprocessing.manager alongside with pebble to avoid re-importing the function everytime? HOT 1
- Type hint error of wrapped function HOT 7
- ISSUE with with ProcessPool when scheduled function return exception (not raise it) HOT 1
- BUG with ProcessPool when scheduled function raise BaseException HOT 1
- issue with handling frozen exceptions in worker thread or process HOT 1
- Cannot create pebble.ProcessPool() multiple times within a single run, all task got stuck and time out HOT 3
- error while running @concurrent.process repetitively HOT 4
- using multiprocess context (for dill support) no longer working HOT 2
- Workers using 100% CPU not getting killed after timeout HOT 11
- map function is extremely slow, seems to be executing sequentially HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from pebble.