I am using joblib to cache calls in parallel. The same function is called in parallel with different args. The problem is when the code of the function has changed: I think that all processes try to update the file func_code.py
, resulting in an error.
Traceback (most recent call last):
File "run.py", line 264, in <module>
for (subject_series, movement) in zip(series, dataset.movement))
File "/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/parallel.py", line 561, in __call__
self.retrieve()
File "/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/parallel.py", line 483, in retrieve
raise exception_type(report)
joblib.my_exceptions.JoblibIOError/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/my_exceptions.py:26: DeprecationWarning: BaseException.message has been deprecated as of Python 2.6
self.message,
: JoblibIOError
___________________________________________________________________________
Multiprocessing exception:
...........................................................................
/mnt/neurospin/sel-poivre/tmp/aa013911/GSPCA_mu1.00_l12.50_a0.04/run.py in <module>()
259 # recomputation when adding a step. Each step is cached independently
260 # in the function.
261 print ('\t\tExtracting subject information')
262 result = Parallel(n_jobs=2)(delayed(runner.process_subject)(
263 subject_series, regions, gm_index, movement, memory=regions_memory)
--> 264 for (subject_series, movement) in zip(series, dataset.movement))
265 regions_series, confounds, covariance, precision = \
266 zip(*result)
267
268 # Explained variance
...........................................................................
/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/parallel.py in __call__(self=Parallel(n_jobs=2), iterable=<generator object <genexpr>>)
556 self.n_dispatched = 0
557 try:
558 for function, args, kwargs in iterable:
559 self.dispatch(function, args, kwargs)
560
--> 561 self.retrieve()
self.retrieve = <bound method Parallel.retrieve of Parallel(n_jobs=2)>
562 # Make sure that we get a last message telling us we are done
563 elapsed_time = time.time() - self._start_time
564 self._print('Done %3i out of %3i | elapsed: %s finished',
565 (len(self._output),
---------------------------------------------------------------------------
Sub-process traceback:
---------------------------------------------------------------------------
IOError Tue Dec 3 07:46:50 2013
PID: 15711 Python 2.7.3: /home/aa013911/epd-7.3-2-rh5-x86_64/bin/python
...........................................................................
/home/aa013911/abraham_miccai2013/msdl/runner.pyc in process_subject(series=MemorizedResult(cachedir="../abide/joblib", func...argument_hash="eab6c74fde4a396a99076425288fa670"), regions=memmap([[-0.45718721, -0.47443968, -0.48146474, ... -0.42739522, -0.41334313]], dtype=float32), gm_index=[False, True, True, False, False, True, True, True, False, False, True, True, True, True, False, False, False, True, True, True, ...], movement='../dataset/ABIDE/Leuven/Leuven_50683/rp_rest.txt', memory=Memory(cachedir='./model3_noop/joblib'))
116
117 confounds = memory.cache(extract_confounds).call_and_shelve(
118 regions_series, regions, gm_index=gm_index)
119
120 covariance, precision = memory.cache(covariance_matrix)(
--> 121 regions_series, confounds=[movement, confounds])
122
123 return regions_series, confounds, covariance, precision
124
125
...........................................................................
/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/memory.pyc in __call__(self=MemorizedFunc(func=<function covariance_matrix at 0xc51a0c8>, cachedir='./model3_noop/joblib'), *args=(MemorizedResult(cachedir="./model3_noop/joblib",...argument_hash="a5abfb42d4bc86e395de839fa65840c9"),), **kwargs={'confounds': ['../dataset/ABIDE/Leuven/Leuven_50683/rp_rest.txt', MemorizedResult(cachedir="./model3_noop/joblib",...argument_hash="e880120cd72429390ae89b5dabba252a")]})
478 return MemorizedResult(self.cachedir, self.func, argument_hash,
479 metadata=metadata, verbose=self._verbose - 1,
480 timestamp=self.timestamp)
481
482 def __call__(self, *args, **kwargs):
--> 483 return self._cached_call(args, kwargs)[0]
484
485 def __reduce__(self):
486 """ We don't store the timestamp when pickling, to avoid the hash
487 depending from it.
...........................................................................
/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/memory.pyc in _cached_call(self=MemorizedFunc(func=<function covariance_matrix at 0xc51a0c8>, cachedir='./model3_noop/joblib'), args=(MemorizedResult(cachedir="./model3_noop/joblib",...argument_hash="a5abfb42d4bc86e395de839fa65840c9"),), kwargs={'confounds': ['../dataset/ABIDE/Leuven/Leuven_50683/rp_rest.txt', MemorizedResult(cachedir="./model3_noop/joblib",...argument_hash="e880120cd72429390ae89b5dabba252a")]})
425 # Compare the function code with the previous to see if the
426 # function code has changed
427 output_dir, argument_hash = self._get_output_dir(*args, **kwargs)
428 metadata = None
429 # FIXME: The statements below should be try/excepted
--> 430 if not (self._check_previous_func_code(stacklevel=4) and
t = undefined
431 os.path.exists(output_dir)):
432 if self._verbose > 10:
433 _, name = get_func_name(self.func)
434 self.warn('Computing func %s, argument hash %s in '
...........................................................................
/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/memory.pyc in _check_previous_func_code(self=MemorizedFunc(func=<function covariance_matrix at 0xc51a0c8>, cachedir='./model3_noop/joblib'), stacklevel=4)
618 # XXX: Should be using warnings, and giving stacklevel
619 if self._verbose > 10:
620 _, func_name = get_func_name(self.func, resolv_alias=False)
621 self.warn("Function %s (stored in %s) has changed." %
622 (func_name, func_dir))
--> 623 self.clear(warn=True)
624 return False
625
626 def clear(self, warn=True):
627 """ Empty the function's cache.
...........................................................................
/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/memory.pyc in clear(self=MemorizedFunc(func=<function covariance_matrix at 0xc51a0c8>, cachedir='./model3_noop/joblib'), warn=True)
632 if os.path.exists(func_dir):
633 shutil.rmtree(func_dir, ignore_errors=True)
634 mkdirp(func_dir)
635 func_code, _, first_line = get_func_code(self.func)
636 func_code_file = os.path.join(func_dir, 'func_code.py')
--> 637 self._write_func_code(func_code_file, func_code, first_line)
638
639 def call(self, *args, **kwargs):
640 """ Force the execution of the function with the given arguments and
641 persist the output values.
...........................................................................
/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/memory.pyc in _write_func_code(self=MemorizedFunc(func=<function covariance_matrix at 0xc51a0c8>, cachedir='./model3_noop/joblib'), filename='./model3_noop/joblib/msdl/covariance/covariance_matrix/func_code.py', func_code='# first line: 7\ndef covariance_matrix(series, co...np.eye(series.shape[1]), np.eye(series.shape[1])\n', first_line=7)
546
547 def _write_func_code(self, filename, func_code, first_line):
548 """ Write the function code and the filename to a file.
549 """
550 func_code = '%s %i\n%s' % (FIRST_LINE_TEXT, first_line, func_code)
--> 551 with open(filename, 'w') as out:
552 out.write(func_code)
553
554 def _check_previous_func_code(self, stacklevel=2):
555 """
IOError: [Errno 2] No such file or directory: './model3_noop/joblib/msdl/covariance/covariance_matrix/func_code.py'
___________________________________________________________________________