Git Product home page Git Product logo

loky's Introduction

Loky logo

Reusable Process Pool Executor

Build Status Documentation Status codecov DOI

Goal

The aim of this project is to provide a robust, cross-platform and cross-version implementation of the ProcessPoolExecutor class of concurrent.futures. It notably features:

  • Consistent and robust spawn behavior: All processes are started using fork + exec on POSIX systems. This ensures safer interactions with third party libraries. On the contrary, multiprocessing.Pool uses fork without exec by default, causing third party runtimes to crash (e.g. OpenMP, macOS Accelerate...).

  • Reusable executor: strategy to avoid re-spawning a complete executor every time. A singleton executor instance can be reused (and dynamically resized if necessary) across consecutive calls to limit spawning and shutdown overhead. The worker processes can be shutdown automatically after a configurable idling timeout to free system resources.

  • Transparent cloudpickle integration: to call interactively defined functions and lambda expressions in parallel. It is also possible to register a custom pickler implementation to handle inter-process communications.

  • No need for if __name__ == "__main__": in scripts: thanks to the use of cloudpickle to call functions defined in the __main__ module, it is not required to protect the code calling parallel functions under Windows.

  • Deadlock free implementation: one of the major concern in standard multiprocessing and concurrent.futures modules is the ability of the Pool/Executor to handle crashes of worker processes. This library intends to fix those possible deadlocks and send back meaningful errors. Note that the implementation of concurrent.futures.ProcessPoolExecutor that comes with Python 3.7+ is as robust as the executor from loky but the latter also works for older versions of Python.

Installation

The recommended way to install loky is with pip,

pip install loky

loky can also be installed from sources using

git clone https://github.com/joblib/loky
cd loky
python setup.py install

Note that loky has an optional dependency on psutil to allow early memory leak detections.

Usage

The basic usage of loky relies on the get_reusable_executor, which internally manages a custom ProcessPoolExecutor object, which is reused or re-spawned depending on the context.

import os
from time import sleep
from loky import get_reusable_executor


def say_hello(k):
    pid = os.getpid()
    print(f"Hello from {pid} with arg {k}")
    sleep(.01)
    return pid


# Create an executor with 4 worker processes, that will
# automatically shutdown after idling for 2s
executor = get_reusable_executor(max_workers=4, timeout=2)

res = executor.submit(say_hello, 1)
print("Got results:", res.result())

results = executor.map(say_hello, range(50))
n_workers = len(set(results))
print("Number of used processes:", n_workers)
assert n_workers == 4

For more advance usage, see our documentation

Workflow to contribute

To contribute to loky, first create an account on github. Once this is done, fork the loky repository to have your own repository, clone it using 'git clone' on the computers where you want to work. Make your changes in your clone, push them to your github account, test them on several computers, and when you are happy with them, send a pull request to the main repository.

Running the test suite

To run the test suite, you need the pytest (version >= 3) and psutil modules. From the root of the project, run the test suite using:

    pip install -e .
    pytest .

Why was the project named loky?

While developping loky, we had some bad experiences trying to debug deadlocks when using multiprocessing.Pool and concurrent.futures.ProcessPoolExecutor, especially when calling functions with non-picklable arguments or returned values at the beginning of the project. When we had to chose a name, we had dealt with so many deadlocks that we wanted some kind of invocation to repel them! Hence loky: a mix of a god, locks and the y that make it somehow cooler and nicer : (and also less likely to result in name conflict in google results ^^).

Fixes to avoid those deadlocks in concurrent.futures were also contributed upstream in Python 3.7+, as a less mystical way to repel the deadlocks :D

Acknowledgement

This work is supported by the Center for Data Science, funded by the IDEX Paris-Saclay, ANR-11-IDEX-0003-02

loky's People

Contributors

aabadie avatar albertcthomas avatar basnijholt avatar chkoar avatar cj-wright avatar glemaitre avatar haim0n avatar jeremiedbb avatar lukasz-migas avatar massich avatar mgorny avatar moinnadeem avatar ogrisel avatar pierreglaser avatar pmav99 avatar rth avatar schlerp avatar tommoral avatar uniontech-lilinjie avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

loky's Issues

Crash of workers (SIGKILL)

A known bug of the python pool is that when a worker crash, the pool might end up in a broken state.
Bug python

This case is critical for the reusable pool (as we don't want to allow it to happen and break several part of a code).
This could be handled by:

  • Raising error for all cached jobs when a worker crash
  • Improve the _help_stuff_finished function call to handle broken states and empty the inqueue which might be deadlocked.

Random failure in test_shutdown_with_sys_exit_at_pickle

tests/test_process_executor_loky.py::TestsProcessPoolLokyShutdown::test_shutdown_with_sys_exit_at_pickle <- tests\_test_process_executor.py 
+++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Captured stderr ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[DEBUG:MainProcess:MainThread] shutting down executor <loky.process_executor.ProcessPoolExecutor object at 0x043D0690>
[DEBUG/LokyProcess-33] Using default backend pickle for pickling.
[DEBUG/LokyProcess-33] recreated blocker with handle 20
[DEBUG/LokyProcess-33] recreated blocker with handle 24
[DEBUG/LokyProcess-33] Queue._after_fork()
[DEBUG/LokyProcess-33] recreated blocker with handle 28
[DEBUG/LokyProcess-33] recreated blocker with handle 32
[INFO/LokyProcess-33] child process calling self.run()
[DEBUG/LokyProcess-33] worker started with timeout=None
[DEBUG:MainProcess:QueueManager] queue management thread shutting down
[DEBUG:MainProcess:QueueManager] closing call_queue
[DEBUG:MainProcess:QueueManager] telling queue thread to quit
[DEBUG:MainProcess:QueueManager] joining processes
[DEBUG:MainProcess:QueueFeederThread] feeder thread got sentinel -- exiting
[INFO/LokyProcess-32] shutting down worker on sentinel
[INFO/LokyProcess-32] process shutting down
[DEBUG/LokyProcess-32] running all "atexit" finalizers with priority >= 0
[DEBUG/LokyProcess-32] running the remaining "atexit" finalizers
[INFO/LokyProcess-32] process exiting with exitcode 0
[INFO/LokyProcess-35] shutting down worker on sentinel
[INFO/LokyProcess-35] process shutting down
[DEBUG/LokyProcess-35] running all "atexit" finalizers with priority >= 0
[DEBUG/LokyProcess-35] running the remaining "atexit" finalizers
[INFO/LokyProcess-35] process exiting with exitcode 0
[INFO/LokyProcess-33] shutting down worker on sentinel
[INFO/LokyProcess-33] process shutting down
[DEBUG/LokyProcess-33] running all "atexit" finalizers with priority >= 0
[DEBUG/LokyProcess-33] running the remaining "atexit" finalizers
[INFO/LokyProcess-33] process exiting with exitcode 0
[DEBUG:MainProcess:ThreadManager] shutting down
[DEBUG/LokyProcess-34] Using default backend pickle for pickling.
[DEBUG/LokyProcess-34] recreated blocker with handle 20
[DEBUG/LokyProcess-34] recreated blocker with handle 24
[DEBUG/LokyProcess-34] Queue._after_fork()
[DEBUG/LokyProcess-34] recreated blocker with handle 28
[DEBUG/LokyProcess-34] recreated blocker with handle 32
[INFO/LokyProcess-34] child process calling self.run()
[DEBUG/LokyProcess-34] worker started with timeout=None
[INFO/LokyProcess-34] shutting down worker on sentinel
[INFO/LokyProcess-34] process shutting down
[DEBUG/LokyProcess-34] running all "atexit" finalizers with priority >= 0
[DEBUG/LokyProcess-34] running the remaining "atexit" finalizers
[INFO/LokyProcess-34] process exiting with exitcode 0
[DEBUG/LokyProcess-36] Using default backend pickle for pickling.
[DEBUG/LokyProcess-36] recreated blocker with handle 20
[DEBUG/LokyProcess-36] recreated blocker with handle 24
[DEBUG/LokyProcess-36] Queue._after_fork()
[DEBUG/LokyProcess-36] recreated blocker with handle 28
[DEBUG/LokyProcess-36] recreated blocker with handle 32
[INFO/LokyProcess-36] child process calling self.run()
[DEBUG/LokyProcess-36] worker started with timeout=None
[INFO/LokyProcess-36] shutting down worker on sentinel
[INFO/LokyProcess-36] process shutting down
[DEBUG/LokyProcess-36] running all "atexit" finalizers with priority >= 0
[DEBUG/LokyProcess-36] running the remaining "atexit" finalizers
[INFO/LokyProcess-36] process exiting with exitcode 0
[DEBUG:MainProcess:QueueManager] queue management thread clean shutdown of worker processes: {}
[DEBUG:MainProcess:MainThread] Queue.join_thread()
[DEBUG:MainProcess:MainThread] using context <loky.backend.context.LokyContext object at 0x03B0E670>
[DEBUG:MainProcess:MainThread] created semlock with handle 1244
[DEBUG:MainProcess:MainThread] ProcessPoolExecutor is setup
[DEBUG:MainProcess:MainThread] created semlock with handle 1576
[DEBUG:MainProcess:MainThread] created semlock with handle 1196
[DEBUG:MainProcess:MainThread] Queue._after_fork()
[DEBUG:MainProcess:MainThread] created semlock with handle 1352
[DEBUG:MainProcess:MainThread] Adjust process count : {1344: <LokyProcess(LokyProcess-37, started)>, 2056: <LokyProcess(LokyProcess-38, started)>, 1112: <LokyProcess(LokyProcess-39, started)>, 2172: <LokyProcess(LokyProcess-40, started)>}
[DEBUG:MainProcess:MainThread] _start_queue_management_thread called
[DEBUG:MainProcess:QueueManager] Queue._start_thread()
[DEBUG:MainProcess:QueueManager] doing self._thread.start()
[DEBUG:MainProcess:MainThread] _start_thread_management_thread called
[DEBUG:MainProcess:MainThread] shutting down executor <loky.process_executor.ProcessPoolExecutor object at 0x043D0370>
[DEBUG:MainProcess:QueueFeederThread] starting thread to feed data to pipe
[DEBUG/LokyProcess-39] Using default backend pickle for pickling.
[DEBUG:MainProcess:QueueManager] ... done self._thread.start()
[DEBUG:MainProcess:ThreadManager] shutting down
[DEBUG/LokyProcess-39] recreated blocker with handle 20
[DEBUG/LokyProcess-39] recreated blocker with handle 24
[DEBUG/LokyProcess-39] Queue._after_fork()
[DEBUG/LokyProcess-39] recreated blocker with handle 28
[DEBUG/LokyProcess-39] recreated blocker with handle 32
[INFO/LokyProcess-39] child process calling self.run()
[DEBUG/LokyProcess-39] worker started with timeout=None
[DEBUG/LokyProcess-38] Using default backend pickle for pickling.
[DEBUG/LokyProcess-38] recreated blocker with handle 20
[DEBUG/LokyProcess-38] recreated blocker with handle 24
[DEBUG/LokyProcess-38] Queue._after_fork()
[DEBUG/LokyProcess-38] recreated blocker with handle 28
[DEBUG/LokyProcess-38] recreated blocker with handle 32
[INFO/LokyProcess-38] child process calling self.run()
[DEBUG/LokyProcess-38] worker started with timeout=None
[DEBUG/LokyProcess-40] Using default backend pickle for pickling.
[DEBUG/LokyProcess-40] recreated blocker with handle 20
[DEBUG/LokyProcess-40] recreated blocker with handle 24
[DEBUG/LokyProcess-40] Queue._after_fork()
[DEBUG/LokyProcess-40] recreated blocker with handle 28
[DEBUG/LokyProcess-40] recreated blocker with handle 32
[INFO/LokyProcess-40] child process calling self.run()
[DEBUG/LokyProcess-40] worker started with timeout=None
[DEBUG/LokyProcess-37] Using default backend pickle for pickling.
[DEBUG/LokyProcess-37] recreated blocker with handle 32
[DEBUG/LokyProcess-37] recreated blocker with handle 36
[DEBUG/LokyProcess-37] Queue._after_fork()
[DEBUG/LokyProcess-37] recreated blocker with handle 48
[DEBUG/LokyProcess-37] recreated blocker with handle 52
[INFO/LokyProcess-37] child process calling self.run()
[DEBUG/LokyProcess-37] worker started with timeout=None
~~~~~~~~~~~~~~~~~~~~~~~~~ Stack of QueueManager (2928) ~~~~~~~~~~~~~~~~~~~~~~~~~
  File "c:\python36\Lib\threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "c:\python36\Lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "c:\python36\Lib\threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "C:\projects\loky\loky\process_executor.py", line 510, in _queue_management_worker
    timeout=_poll_timeout)
  File "c:\python36\Lib\multiprocessing\connection.py", line 859, in wait
    ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
  File "c:\python36\Lib\multiprocessing\connection.py", line 791, in _exhaustive_wait
    res = _winapi.WaitForMultipleObjects(L, False, timeout)
~~~~~~~~~~~~~~~~~~~~~~~~~~ Stack of Thread-26 (1468) ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  File "c:\python36\Lib\threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "c:\python36\Lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "c:\python36\Lib\threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "c:\python36\Lib\multiprocessing\resource_sharer.py", line 142, in _serve
    with self._listener.accept() as conn:
  File "c:\python36\Lib\multiprocessing\connection.py", line 453, in accept
    c = self._listener.accept()
  File "c:\python36\Lib\multiprocessing\connection.py", line 663, in accept
    [ov.event], False, INFINITE)
~~~~~~~~~~~~~~~~~~~~~~~~~~ Stack of MainThread (2068) ~~~~~~~~~~~~~~~~~~~~~~~~~~
  File "c:\python36\Lib\runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "c:\python36\Lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "C:\projects\loky\.tox\py36\Scripts\py.test.EXE\__main__.py", line 9, in <module>
    sys.exit(main())
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\config.py", line 58, in main
    return config.hook.pytest_cmdline_main(config=config)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\main.py", line 134, in pytest_cmdline_main
    return wrap_session(config, _main)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\main.py", line 105, in wrap_session
    session.exitstatus = doit(config, session) or 0
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\main.py", line 141, in _main
    config.hook.pytest_runtestloop(session=session)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\main.py", line 164, in pytest_runtestloop
    item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\runner.py", line 60, in pytest_runtest_protocol
    runtestprotocol(item, nextitem=nextitem)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\runner.py", line 73, in runtestprotocol
    reports.append(call_and_report(item, "call", log))
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\runner.py", line 127, in call_and_report
    call = call_runtest_hook(item, when, **kwds)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\runner.py", line 145, in call_runtest_hook
    return CallInfo(lambda: ihook(item=item, **kwds), when=when)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\runner.py", line 157, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\runner.py", line 145, in <lambda>
    return CallInfo(lambda: ihook(item=item, **kwds), when=when)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\runner.py", line 98, in pytest_runtest_call
    item.runtest()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\python.py", line 1593, in runtest
    self.ihook.pytest_pyfunc_call(pyfuncitem=self)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\python.py", line 142, in pytest_pyfunc_call
    testfunction(**testargs)
  File "C:\projects\loky\tests\_test_process_executor.py", line 106, in test_shutdown_with_sys_exit_at_pickle
    e.submit(id, ExitAtPickle())
  File "C:\projects\loky\loky\_base.py", line 609, in __exit__
    self.shutdown(wait=True)
  File "C:\projects\loky\loky\process_executor.py", line 987, in shutdown
    self._queue_management_thread.join()
  File "c:\python36\Lib\threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "c:\python36\Lib\threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
+++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
ERROR: InvocationError: 'C:\\projects\\loky\\.tox\\py36\\Scripts\\py.test.EXE -vlx --timeout=30'
___________________________________ summary ___________________________________
ERROR:   py36: commands failed

Exit -11 (segfault) in check_subprocess in test_interactively_define_executor_no_main

This job failed with a segfault on MacOs with python2.7.

=================================== FAILURES ===================================
__________________ test_interactively_define_executor_no_main __________________
    def test_interactively_define_executor_no_main():
        # check that the init_main_module parameter works properly
        # when using -c option, we don't need the safeguard if __name__ ..
        # and thus test LokyProcess without the extra argument. For running
        # a script, it is necessary to use init_main_module=False.
        code = """if True:
            from loky import get_reusable_executor
            e = get_reusable_executor()
            e.submit(id, 42).result()
            print("ok")
        """
        cmd = [sys.executable]
        try:
            fid, filename = mkstemp(suffix="_joblib.py")
            os.close(fid)
            with open(filename, mode='wb') as f:
                f.write(code.encode('ascii'))
            cmd += [filename]
>           check_subprocess_call(cmd, stdout_regex=r'ok', timeout=10)
cmd        = ['/Users/travis/build/tomMoral/loky/.tox/py27/bin/python2.7', '/var/folders/my/m6ynh3bn6tq06h7xr3js0z7r0000gn/T/tmpP_HFC1_joblib.py']
code       = 'if True:\n        from loky import get_reusable_executor\n        e = get_reusable_executor()\n        e.submit(id, 42).result()\n        print("ok")\n    '
f          = <closed file '/var/folders/my/m6ynh3bn6tq06h7xr3js0z7r0000gn/T/tmpP_HFC1_joblib.py', mode 'wb' at 0x10ff52030>
fid        = 61
filename   = '/var/folders/my/m6ynh3bn6tq06h7xr3js0z7r0000gn/T/tmpP_HFC1_joblib.py'
tests/test_reusable_executor.py:551: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
cmd = ['/Users/travis/build/tomMoral/loky/.tox/py27/bin/python2.7', '/var/folders/my/m6ynh3bn6tq06h7xr3js0z7r0000gn/T/tmpP_HFC1_joblib.py']
timeout = 10, stdout_regex = 'ok', stderr_regex = None
    def check_subprocess_call(cmd, timeout=1, stdout_regex=None,
                              stderr_regex=None):
        """Runs a command in a subprocess with timeout in seconds.
    
        Also checks returncode is zero, stdout if stdout_regex is set, and
        stderr if stderr_regex is set.
        """
        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE)
    
        def kill_process():
            warnings.warn("Timeout running {}".format(cmd))
            proc.kill()
    
        timer = threading.Timer(timeout, kill_process)
        try:
            timer.start()
            stdout, stderr = proc.communicate()
    
            if sys.version_info[0] >= 3:
                stdout, stderr = stdout.decode(), stderr.decode()
            if proc.returncode == -9:
                message = (
                    'Subprocess timeout after {}s.\nStdout:\n{}\n'
                    'Stderr:\n{}').format(timeout, stdout, stderr)
                raise TimeoutError(message)
            elif proc.returncode != 0:
                message = (
                    'Non-zero return code: {}.\nStdout:\n{}\n'
                    'Stderr:\n{}').format(
                        proc.returncode, stdout, stderr)
>               raise ValueError(message)
E               ValueError: Non-zero return code: -11.
E               Stdout:
E               
E               Stderr:
E               loky/backend/semlock.py:217: RuntimeWarning: semaphore are broken on OSX, release might increase its maximal value
E                 "increase its maximal value", RuntimeWarning)
E               loky/backend/semlock.py:217: RuntimeWarning: semaphore are broken on OSX, release might increase its maximal value
E                 "increase its maximal value", RuntimeWarning)
cmd        = ['/Users/travis/build/tomMoral/loky/.tox/py27/bin/python2.7', '/var/folders/my/m6ynh3bn6tq06h7xr3js0z7r0000gn/T/tmpP_HFC1_joblib.py']
kill_process = <function kill_process at 0x10ff61f50>
message    = 'Non-zero return code: -11.\nStdout:\n\nStderr:\nloky/backend/semlock.py:217: RuntimeWarning: semaphore are broken on ...maphore are broken on OSX, release might increase its maximal value\n  "increase its maximal value", RuntimeWarning)\n'
proc       = <subprocess.Popen object at 0x10fea1b90>
stderr     = 'loky/backend/semlock.py:217: RuntimeWarning: semaphore are broken on OSX, release might increase its maximal value\n ...maphore are broken on OSX, release might increase its maximal value\n  "increase its maximal value", RuntimeWarning)\n'
stderr_regex = None
stdout     = ''
stdout_regex = 'ok'
timeout    = 10
timer      = <_Timer(Thread-19, stopped 123145319129088)>
tests/utils.py:111: ValueError
----------------------------- Captured stderr call -----------------------------
[DEBUG:LokyProcess-413:MainThread] recreated blocker with handle 51 and name "/loky-2418-rdWIkd"
[DEBUG:LokyProcess-413:MainThread] recreated blocker with handle 52 and name "/loky-2418-8UDY1l"
[DEBUG:LokyProcess-413:MainThread] recreated blocker with handle 53 and name "/loky-2418-uUnsqk"
[DEBUG:LokyProcess-413:MainThread] Queue._after_fork()
[DEBUG:LokyProcess-413:MainThread] recreated blocker with handle 56 and name "/loky-2418-ntIpgw"
[DEBUG:LokyProcess-413:MainThread] recreated blocker with handle 57 and name "/loky-2418-yZLFxo"
[DEBUG:LokyProcess-413:MainThread] recreated blocker with handle 48 and name "/loky-2418-O9Bx1q"
[INFO:LokyProcess-413:MainThread] child process calling self.run()
[DEBUG:LokyProcess-413:MainThread] worker started with timeout=10

Running tests; contributing instructions

I was wondering what is the expected way to run tests. I cannot find any documentation about it.

After installing loky with pip install -e ., running

pytest .

results in 3 test failures in tests/test_loky_backend.py (on Linux, Python 3.6), for instance

______________________________________________ TestLokyBackend.test_interactively_define_process_no_main[True] ______________________________________________

self = <tests.test_loky_backend.TestLokyBackend object at 0x7efc6f0aa2e8>, run_file = True
[...]
E               ValueError: Non-zero return code: 1.
E               Stdout:
E               
E               Stderr:
E               Traceback (most recent call last):
E                 File "/tmp/tmp5c1ijwmn_joblib.py", line 1, in <module>
E                   from loky.backend.process import LokyProcess
E               ImportError: No module named 'loky'

tests/utils.py:111: ValueError

the remaining 212 tests pass.

I can see that Travis CI uses tox that does call pytest but shouldn't this also work without tox?

Test openMP compatibility

Add test with cython and openMP to ensure the compatibility of loky backend with openMP and highlights the crash caused by fork.

Detect the number of usable CPU

Multiprocessing with n_jobs given by multiprocessing.cpu_count() is not optimal for systems where not all CPU can be used (in particular, Docker, Travis CI etc).

Following the discussion with @ogrisel , as suggested in the multiprocessing docs using len(os.sched_getaffinity(0)) might be better in the context of loky.

Add a benchmark script

To compare against multiprocessing.Pool and the original ProcessPoolExecutor under various versions of Python and on all platforms.

If the benchmark is short enough it could be run after the test suite on travis and appveyor to get an easy way to compare the outcome on all the supported platforms.

Benchmarking:

  • setup speed for Process
  • setup speed for Executor
  • speed of variable transfert between processes (eg large string)

Callback error are not catched

The callback mechanism only call the function in the result_handler thread.
It doesn't catch the error.

The current pool detects the crash of the result_handler and thus flags all the remaining jobs with AbortedWorkerError with a message about the result_handler crashing.

This doesn't cause any deadlock but it can be unclear for the user.

  • Shall we create a different Error for the crashes in result_handler?
  • Shall we catch the error and do a better recovery process? This what is done in 622d7f7 by surcharging the result types.

Test semaphore and file descriptor management

Add unit testing to ensure that all the open file descriptor and semaphore are open/closed in the right process in the loky backend.
This involved:

  • Making sure fd are properly closed when a process is spawn with loky backend.
  • Making sure semaphores are all close once the lock are destroyed with loky backend (python2.7/3.3)
  • Add informative names for semaphores (python2.7/3.3)
  • Making sure the arguments pipes are still open in child processes:
    this was already tested in test_process as we use a mp.Queue to send and receive objects

Add loky.__version__?

I was surprised that loky does not have a __version__ attribute. I would say, this is something I expect for most packages:

import loky
print(loky.__version__)

AttributeError: module 'loky' has no attribute '__version__'

Large overhead for very short tasks

In multiprocessing.Pool, there is one thread sending bytes to the workers and another thread receiving bytes from the workers. This is not the case with ProcessPoolExecutor where a single thread handles both type of communication.

Therefore, ProcessPoolExecutor has a larger overhead when submitting a very large number of very short tasks. This could be solve by adding a dedicated thread for feeding the workers.
Here is a piece of code to reproduce the gap:

from itertools import repeat
from loky.reusable_executor import get_reusable_executor
from concurrent.futures import ProcessPoolExecutor

N_ITER = 50000

def long_executor(get_executor, chunksize=1, **kwargs):
    with get_executor(**kwargs) as executor:
        for _ in executor.map(id, repeat(0, N_ITER),
                              chunksize=chunksize):
            pass


def long_pool(get_pool, chunksize=1, **kwargs):
    pool = get_pool(**kwargs)
    for _ in pool.map(id, repeat(0, N_ITER), chunksize=chunksize):
        pass
    pool.terminate()


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser('Programme to launch experiemnt')
    parser.add_argument('--run', type=str, default=None,
                        help='run loky or exec')
    parser.add_argument('--chunksize', type=int, default=1,
                        help='choose chunksize')

    args = parser.parse_args()
    max_workers = 8
    results = defaultdict(lambda: [])
    if args.run == "loky":
        long_executor(get_reusable_executor, max_workers=max_workers,
                      chunksize=args.chunksize)
    elif args.run == "pool":
        long_pool(context_spawn.Pool, processes=max_workers,
                  chunksize=args.chunksize)
    elif args.run == "ccr":
        long_executor(ProcessPoolExecutor, max_workers=max_workers,
                      chunksize=args.chunksize)

Random "IOError: [Errno 9] Bad file descriptor" in CreatePipe under Windows

______________ TestsProcessPoolLokyExecutor.test_worker_timeout _______________
self = <tests.test_process_executor_loky.TestsProcessPoolLokyExecutor instance at 0x02DCE3C8>
    @pytest.mark.timeout(50 if sys.platform == "win32" else 25)
    def test_worker_timeout(self):
        self.executor.shutdown(wait=True)
        self.check_no_running_workers(patience=5)
        timeout = getattr(self, 'min_worker_timeout', .01)
        try:
            self.executor = self.executor_type(
                max_workers=4, context=self.context, timeout=timeout)
        except NotImplementedError as e:
            self.skipTest(str(e))
    
        for i in range(5):
            # Trigger worker spawn for lazy executor implementations
>           for result in self.executor.map(id, range(8)):
i          = 4
result     = 46628128
self       = <tests.test_process_executor_loky.TestsProcessPoolLokyExecutor instance at 0x02DCE3C8>
timeout    = 0.01
tests\_test_process_executor.py:626: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
loky\process_executor.py:942: in map
    timeout=timeout)
loky\_base.py:576: in map
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
loky\process_executor.py:909: in submit
    self._ensure_executor_running()
loky\process_executor.py:887: in _ensure_executor_running
    self._adjust_process_count()
loky\process_executor.py:878: in _adjust_process_count
    p.start()
loky\backend\process.py:49: in start
    self._popen = self._Popen(self)
loky\backend\process.py:37: in _Popen
    return Popen(process_obj)
loky\backend\popen_loky_win32.py:43: in __init__
    rhandle, wfd = _winapi.CreatePipe(None, 0)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
args = (None, 0), rfd = -1, wfd = 8
_current_process = <_subprocess_handle object at 0x02DB0C80>
    @staticmethod
    def CreatePipe(*args):
        rfd, wfd = os.pipe()
        _current_process = win_api.GetCurrentProcess()
        rhandle = win_api.DuplicateHandle(
>           _current_process, msvcrt.get_osfhandle(rfd),
            _current_process, 0, True,
            win_api.DUPLICATE_SAME_ACCESS)
E       IOError: [Errno 9] Bad file descriptor
_current_process = <_subprocess_handle object at 0x02DB0C80>
args       = (None, 0)
rfd        = -1
wfd        = 8

Random failure in test_worker_timeout ([DEBUG:MainProcess:QueueManager] The executor is broken as at least one process terminated abruptly)

First observed on Windows with 64 bit Python 2.7 in #87:

[DEBUG/LokyProcess-210] recreated blocker with handle 16
[DEBUG/LokyProcess-210] recreated blocker with handle 20
[DEBUG/LokyProcess-210] Queue._after_fork()
[DEBUG/LokyProcess-210] recreated blocker with handle 36
[DEBUG/LokyProcess-210] recreated blocker with handle 40
[INFO/LokyProcess-210] child process calling self.run()
[DEBUG/LokyProcess-210] worker started with timeout=0.01
[INFO/LokyProcess-210] shutting down worker after timeout 0.010s
[INFO/LokyProcess-210] process shutting down
[DEBUG/LokyProcess-210] running all "atexit" finalizers with priority >= 0
[DEBUG/LokyProcess-210] running the remaining "atexit" finalizers
[INFO/LokyProcess-210] process exiting with exitcode 0
[DEBUG:MainProcess:MainThread] Adjust process count : {1360: <LokyProcess(LokyProcess-213, started)>, 2580: <LokyProcess(LokyProcess-212, started)>, 2224: <LokyProcess(LokyProcess-210, stopped)>, 2172: <LokyProcess(LokyProcess-211, started)>}
[DEBUG:MainProcess:QueueManager] The executor is broken as at least one process terminated abruptly
-------------------------- Captured stderr teardown ---------------------------
[DEBUG/LokyProcess-211] Using default backend pickle for pickling.
[DEBUG/LokyProcess-211] recreated blocker with handle 16
[DEBUG/LokyProcess-211] recreated blocker with handle 20
[DEBUG/LokyProcess-211] Queue._after_fork()
[DEBUG/LokyProcess-211] recreated blocker with handle 44
[DEBUG/LokyProcess-211] recreated blocker with handle 52
[INFO/LokyProcess-211] child process calling self.run()
[DEBUG/LokyProcess-211] worker started with timeout=0.01
[DEBUG:MainProcess:QueueManager] terminate process LokyProcess-213
[DEBUG:MainProcess:QueueManager] terminate process LokyProcess-212
[DEBUG:MainProcess:QueueManager] terminate process LokyProcess-210
[DEBUG:MainProcess:QueueManager] terminate process LokyProcess-211
[DEBUG:MainProcess:QueueManager] queue management thread shutting down
[DEBUG:MainProcess:QueueManager] closing call_queue
[DEBUG:MainProcess:QueueManager] telling queue thread to quit
[DEBUG:MainProcess:QueueManager] joining processes
[DEBUG:MainProcess:QueueFeederThread] feeder thread got sentinel -- exiting
[DEBUG:MainProcess:QueueManager] queue management thread clean shutdown of worker processes: {}
================================== FAILURES ===================================
______________ TestsProcessPoolLokyExecutor.test_worker_timeout _______________
self = <tests.test_process_executor_loky.TestsProcessPoolLokyExecutor instance at 0x03569C38>
    @pytest.mark.timeout(50 if sys.platform == "win32" else 25)
    def test_worker_timeout(self):
        self.executor.shutdown(wait=True)
        self.check_no_running_workers(patience=5)
        timeout = getattr(self, 'min_worker_timeout', .01)
        try:
            self.executor = self.executor_type(
                max_workers=4, context=self.context, timeout=timeout)
        except NotImplementedError as e:
            self.skipTest(str(e))
    
        for i in range(5):
            # Trigger worker spawn for lazy executor implementations
>           for result in self.executor.map(id, range(8)):
i          = 2
result     = 44988960
self       = <tests.test_process_executor_loky.TestsProcessPoolLokyExecutor instance at 0x03569C38>
timeout    = 0.01
tests\_test_process_executor.py:638: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
loky\_base.py:584: in result_iterator
    yield future.result()
loky\_base.py:431: in result
    return self.__get_result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future at 0x3573370 state=finished raised BrokenExecutor>
    def __get_result(self):
        if self._exception:
>           raise self._exception
E           BrokenExecutor: A process in the process pool was terminated abruptly while the future was running or pending.
self       = <Future at 0x3573370 state=finished raised BrokenExecutor>
loky\_base.py:382: BrokenExecutor

It seems we have a race condition in the QueueManager: it might detect process 211 as started but dead before it actually starts (the starting logs of 211 showup in the teardown phase of the test, after the queue manager has decided that one process was dead).

#88 has been fixed in the mean time but this should be a different, Python 3 only problem.

Race condition on OSX with fork and very short worker timeout

The test test_worker_timeout fail on OSX with python3.5 and fork when the timeout is too low (typically .01s).
There seem to be some race conditions that make the worker not start properly has no log occurs for those failed worker and this makes the test fail.

loky.backend.semaphore_tracker.sem_unlink does not have same signature if coming from ctypes or _multiprocessing

  • _multi_processing.sem_unlink takes str
  • loky.backend.semlock.sem_unlink comes from ctypes and take bytes.

It feels like some code was written with the ctypes variant in mind and raise an error when the _multiprocessing.sem_unlink is called. Tests seem to be only testing loky.backend.semlock.sem_unlink.

Context

This is an error I just saw in a joblib Travis build. Note this is with loky version 1.2.1.

E               /home/travis/build/joblib/joblib/joblib/externals/loky/backend/semaphore_tracker.py:195: UserWarning: semaphore_tracker: There appear to be 6 leaked semaphores to clean up at shutdown
E                 len(cache))
E               /home/travis/build/joblib/joblib/joblib/externals/loky/backend/semaphore_tracker.py:211: UserWarning: semaphore_tracker: b'/loky-5456-6haleho6': TypeError('argument 1 must be str, not bytes',)
E                 warnings.warn('semaphore_tracker: %r: %r' % (name, e)) 

Quickly looking at it, it seems like this is still in master. The code where the warning happens is here:
https://github.com/tomMoral/loky/blob/dec1c8144b12938dfe7bfc511009e12f25fd1cd9/loky/backend/semaphore_tracker.py#L203-L211

Fix test openMP on appveyor

The cython compilation does not use openMP to compile the parallel loop as it is not installed on appveyor. This is out of scope of #33 but should be fixed at some point.

Random Deadlock

Random deadlock in tests/test_rpool.py::test_crashes
Use $./launch_tests.sh to get the test running until a crash.

Thread 0x00007f0020ecb700 (most recent call first):
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 379 in _recv
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 407 in _recv_bytes
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 250 in recv
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 429 in _handle_results
  File "/usr/lib/python3.5/threading.py", line 862 in run
  File "/usr/lib/python3.5/threading.py", line 914 in _bootstrap_inner
  File "/usr/lib/python3.5/threading.py", line 882 in _bootstrap

Thread 0x00007f001bfff700 (most recent call first):
  File "/usr/lib/python3.5/threading.py", line 293 in wait
  File "/usr/lib/python3.5/queue.py", line 164 in get
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 376 in _handle_tasks
  File "/usr/lib/python3.5/threading.py", line 862 in run
  File "/usr/lib/python3.5/threading.py", line 914 in _bootstrap_inner
  File "/usr/lib/python3.5/threading.py", line 882 in _bootstrap

Thread 0x00007f001affd700 (most recent call first):
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 367 in _handle_workers
  File "/usr/lib/python3.5/threading.py", line 862 in run
  File "/usr/lib/python3.5/threading.py", line 914 in _bootstrap_inner
  File "/usr/lib/python3.5/threading.py", line 882 in _bootstrap

Thread 0x00007f00280f4700 (most recent call first):
  File "/usr/lib/python3.5/threading.py", line 293 in wait
  File "/usr/lib/python3.5/threading.py", line 549 in wait
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 599 in wait
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 602 in get
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 253 in apply
  File "/home/tom/Work/prog/github/RusePool/tests/test_rpool.py", line 168 in test_crashes
  File "/usr/lib/python3.5/site-packages/_pytest/python.py", line 286 in pytest_pyfunc_call
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 596 in execute
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 333 in <lambda>
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 338 in _hookexec
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 724 in __call__
  File "/usr/lib/python3.5/site-packages/_pytest/python.py", line 1406 in runtest
  File "/usr/lib/python3.5/site-packages/_pytest/runner.py", line 90 in pytest_runtest_call
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 596 in execute
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 264 in __init__
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 247 in _wrapped_call
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 595 in execute
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 333 in <lambda>
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 338 in _hookexec
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 724 in __call__
  File "/usr/lib/python3.5/site-packages/_pytest/runner.py", line 137 in <lambda>
  File "/usr/lib/python3.5/site-packages/_pytest/runner.py", line 149 in __init__
  File "/usr/lib/python3.5/site-packages/_pytest/runner.py", line 137 in call_runtest_hook
  File "/usr/lib/python3.5/site-packages/_pytest/runner.py", line 119 in call_and_report
  File "/usr/lib/python3.5/site-packages/_pytest/runner.py", line 75 in runtestprotocol
  File "/usr/lib/python3.5/site-packages/_pytest/runner.py", line 65 in pytest_runtest_protocol
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 596 in execute
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 264 in __init__
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 247 in _wrapped_call
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 595 in execute
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 333 in <lambda>
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 338 in _hookexec
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 724 in __call__
  File "/usr/lib/python3.5/site-packages/_pytest/main.py", line 146 in pytest_runtestloop
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 596 in execute
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 333 in <lambda>
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 338 in _hookexec
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 724 in __call__
  File "/usr/lib/python3.5/site-packages/_pytest/main.py", line 121 in _main
  File "/usr/lib/python3.5/site-packages/_pytest/main.py", line 90 in wrap_session
  File "/usr/lib/python3.5/site-packages/_pytest/main.py", line 115 in pytest_cmdline_main
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 596 in execute
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 333 in <lambda>
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 338 in _hookexec
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 724 in __call__
  File "/usr/lib/python3.5/site-packages/_pytest/config.py", line 48 in main
  File "/usr/bin/py.test", line 9 in <module>

Improve error message in case of failure in test_wait_result

Observed when running tox -e py27 on linux:

=================================================================================== FAILURES ===================================================================================
________________________________________________________________________ TestCondition.test_wait_result ________________________________________________________________________

self = <tests.test_synchronize.TestCondition instance at 0x7f754456ea70>

    @pytest.mark.skipif(sys.platform == "win32" and
                        sys.version_info[:2] < (3, 3),
                        reason="Condition.wait always returned None before 3.3"
                        " and we do not overload win32 Condition")
    def test_wait_result(self):
        if sys.platform != 'win32':
            pid = os.getpid()
        else:
            pid = None
    
        c = loky_context.Condition()
        with c:
            assert not c.wait(0)
            assert not c.wait(0.1)
    
            p = loky_context.Process(target=self._test_wait_result,
                                     args=(c, pid))
            p.start()
    
            assert c.wait(10)
            if pid is not None:
                with pytest.raises(KeyboardInterrupt):
                    c.wait(10)
>           p.join()

c          = <Condition(<RLock(None, 0)>, 0)>
p          = <LokyProcess(LokyProcess-423, stopped)>
pid        = 20335
self       = <tests.test_synchronize.TestCondition instance at 0x7f754456ea70>

tests/test_synchronize.py:348: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
loky/backend/synchronize.py:249: in __exit__
    return self._lock.__exit__(*args)
loky/backend/synchronize.py:108: in __exit__
    return self._semlock.release()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <loky.backend.semlock.SemLock object at 0x7f75490656d0>

    def release(self):
        if self.kind == RECURSIVE_MUTEX:
            assert self._is_mine(), (
>               "attempt to release recursive lock not owned by thread")
E           AssertionError: attempt to release recursive lock not owned by thread

self       = <loky.backend.semlock.SemLock object at 0x7f75490656d0>

loky/backend/semlock.py:193: AssertionError
----------------------------------------------------------------------------- Captured stderr call -----------------------------------------------------------------------------
[DEBUG:MainProcess:MainThread] created semlock with handle 140141780742144 and name "/loky-20335-L08GeF"
[DEBUG:MainProcess:MainThread] created semlock with handle 140141780738048 and name "/loky-20335-CO5Aui"
[DEBUG:MainProcess:MainThread] created semlock with handle 140141780733952 and name "/loky-20335-bGApZz"
[DEBUG:MainProcess:MainThread] created semlock with handle 140141780729856 and name "/loky-20335-0LIgU2"
[DEBUG:MainProcess:MainThread] launch python with cmd:
['/home/ogrisel/code/loky/.tox/py27/bin/python2.7', '-m', 'loky.backend.popen_loky_posix', '--name-process', 'LokyProcess-423', '--pipe', '19', '--semaphore', '12']
[DEBUG:LokyProcess-423:MainThread] recreated blocker with handle 140141780742144 and name "/loky-20335-L08GeF"
[DEBUG:LokyProcess-423:MainThread] recreated blocker with handle 140141780738048 and name "/loky-20335-CO5Aui"
[DEBUG:LokyProcess-423:MainThread] recreated blocker with handle 140141780733952 and name "/loky-20335-bGApZz"
[DEBUG:LokyProcess-423:MainThread] recreated blocker with handle 140141780729856 and name "/loky-20335-0LIgU2"
[INFO:LokyProcess-423:MainThread] child process calling self.run()
[INFO:LokyProcess-423:MainThread] process shutting down
[DEBUG:LokyProcess-423:MainThread] running all "atexit" finalizers with priority >= 0
[DEBUG:LokyProcess-423:MainThread] running the remaining "atexit" finalizers
[INFO:LokyProcess-423:MainThread] process exiting with exitcode 0
[INFO:LokyProcess-423:Dummy-1] process shutting down
[DEBUG:LokyProcess-423:Dummy-1] running all "atexit" finalizers with priority >= 0
[DEBUG:LokyProcess-423:Dummy-1] running the remaining "atexit" finalizers
========================================================= 1 failed, 132 passed, 5 skipped, 1 xpassed in 67.89 seconds ==========================================================
ERROR: InvocationError: '/home/ogrisel/code/loky/.tox/py27/bin/py.test -lv --maxfail=2 --timeout=10'

Unstable TestLokyBackend::test_process on travis

https://travis-ci.org/tomMoral/loky/jobs/230024584

Unfortunately there is no informative output in the travis log:

tests/test_loky_backend.py::TestLokyBackend::test_process ERROR: InvocationError: '/home/travis/build/tomMoral/loky/.tox/py33/bin/py.test -vl --timeout=15 --maxfail=5'

___________________________________ summary ____________________________________
ERROR:   py33: commands failed

it could be the case that travis has just killed the main python process.

Note that this test was successful when travis was previously run on the PR so I suspect that this is a random failure.

test_interpreter_shutdown randomly fails with executor._flags.broken

https://travis-ci.org/tomMoral/loky/jobs/251070720#L414

_ ERROR at teardown of TestsProcessPoolSpawnShutdown.test_interpreter_shutdown _
self = <tests.test_process_executor_spawn.TestsProcessPoolSpawnShutdown object at 0x7ff969162c88>
method = <bound method TestsProcessPoolSpawnShutdown.test_interpreter_shutdown of <tests.test_process_executor_spawn.TestsProcessPoolSpawnShutdown object at 0x7ff969162c88>>

    def teardown_method(self, method):
        # Make sure is not broken if it should not be
        executor = getattr(self, 'executor', None)
        if executor is not None:
            assert hasattr(method, 'broken_pool') != (
>               not self.executor._flags.broken)
E           AssertionError

executor   = <loky.process_executor.ProcessPoolExecutor object at 0x7ff9691544a8>
method     = <bound method TestsProcessPoolSpawnShutdown.test_interpreter_shutdown of <tests.test_process_executor_spawn.TestsProcessPoolSpawnShutdown object at 0x7ff969162c88>>
self       = <tests.test_process_executor_spawn.TestsProcessPoolSpawnShutdown object at 0x7ff969162c88>

[Question] Consequence of broken semaphore on OSX?

Hello,
this is not an issue, but I'm simply trying to understand the consequences of this warning that appears on OSX (due to the missing sem_getvalue function):

loky/backend/semlock.py:217: RuntimeWarning: semaphore are broken on OSX, release might increase its maximal value

import time
from loky import ProcessPoolExecutor

def f(x):
    time.sleep(6)
    return x*x


if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=3) as executor:
        print("Started")
        results = executor.map(f, [1, 2, 3])
        print(list(results))
        print("Ended")

What is the worse case outcome of this warning?

I also noticed that I could change process_executor.py to use a queue_size of 1 to avoid the warning:

        # Finally setup the queues for interprocess communication
        self._setup_queues(job_reducers, result_reducers, 1) #force a queue_size of 1

But I'm not sure what that would mean in terms of performance.

Love the work by the way, keep it up! โค๏ธ

Unsafe list comprehension in QueueManager thread under Python 3

As seen in a randomly failing cron-test, seemlingly causing a failure in test_worker_timeout.

[DEBUG/ForkServerProcess-221] running all "atexit" finalizers with priority >= 0

[DEBUG/ForkServerProcess-221] running the remaining "atexit" finalizers

[INFO/ForkServerProcess-221] process exiting with exitcode 0

[DEBUG:MainProcess:ThreadManager] All workers timed out. Adjusting process count.

[DEBUG:MainProcess:MainThread] Adjust process count : {2939: <ForkServerProcess(ForkServerProcess-224, started)>, 2941: <ForkServerProcess(ForkServerProcess-226, started)>, 2942: <ForkServerProcess(ForkServerProcess-227, started)>, 2943: <ForkServerProcess(ForkServerProcess-228, started)>}

Exception in thread QueueManager:

Traceback (most recent call last):

  File "/opt/python/3.4.6/lib/python3.4/threading.py", line 911, in _bootstrap_inner

    self.run()

  File "/opt/python/3.4.6/lib/python3.4/threading.py", line 859, in run

    self._target(*self._args, **self._kwargs)

  File "/home/travis/build/tomMoral/loky/loky/process_executor.py", line 496, in _queue_management_worker

    worker_sentinels = [p.sentinel for p in processes.values()]

  File "/home/travis/build/tomMoral/loky/loky/process_executor.py", line 496, in <listcomp>

    worker_sentinels = [p.sentinel for p in processes.values()]

RuntimeError: dictionary changed size during iteration

Support frozen executable on Windows

If using PyInstaller to create an executable, the recommended way for multiprocessing is to use freeze_support:

from multiprocessing import Process, freeze_support

def f():
    print 'hello world!'

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

If we do not use this, the UI will repeat itself (like fork) but if I do use that on Windows I get an error:

Traceback (most recent call last):
File "gui\app.py", line 590, in
File "multiprocessing_init_.py", line 145, in freeze_support
File "multiprocessing\forking.py", line 336, in freeze_support
File "multiprocessing\forking.py", line 326, in is_forking
AssertionError

then

BrokenProcessPool: A process in the executor was terminated abruptly while the future was running or pending.

2018-04-27 16:54:24,895.895 [Dummy-1 ] util.py:78 info(): process shutting down

also, this is the line that raises the assertion:

        if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
            assert len(argv) == 3
            return True

This is the passed argv (1 at the end is missing):

['D:\development\app\dist\app\app.exe', '--multiprocessing-fork']

I am wondering if argv is different when using loky than multiprocessing, and what is the equivalent/workaround for loky

Ensure that loky exceptions inherit from their concurrent.futures counterparts

Exceptions such as CanceledError and TimeoutError should be:

  • exposed at the top level namespace package (to be part of the public API)
  • derive from their concurrent.futures counterpart to make it possible to try / except with the exceptions from concurrent.futures to make the loky executor a drop in replacement.

Deadlock on __exit__ with BrokenProcessPool

This code causes a deadlock:

from loky import ProcessPoolExecutor
  

class PickleError():
    def __reduce__(self):
        raise RuntimeError()


if __name__ == "__main__":
    with ProcessPoolExecutor() as e:
        e.map(id, [PickleError()])

This results from a race condition, where the executor is shutting down before it gets flags as broken. This is somewhat linked to #71

DOC Extend documentation with more examples and extended API

Extend the existing doc:

  • Add examples of the fixed deadlocks

    • after worker crash
    • after result unpickle errors
  • Add example of using the reusable executor:

    • show resize capabilities
    • show auto respawn
    • show worker time out
  • Document the behaviors of get_rusable_executor

    • reuse=True only resize the executor if needed.
    • if the arguments are changed, a new executor is created with reuse!=True
  • Tutorial on the concurrent.futures API and maybe some informations on Thread vs Process

Random failure in travis with `fork` processes

There is a random failure in travis with forked processes.

Here, one of the forked process (ForkProcess-208) never started. I think we should drop the support of fork backend as it is not robust by design. It would quicken the test and also avoid confusing the users on why this library is developed.

Other examples of the same bug: here with ForkProcess-206 in the first failure.

Random deadlock in current master

tests/test_rpool.py::test_deadlock_kill Timeout (0:00:05)!
Thread 0x00007fff7b73f000 (most recent call first):
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 383 in _recv
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 416 in _recv_bytes
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 216 in recv_bytes
  File "/Users/ogrisel/code/Rpool/backend/reusable_pool.py", line 312 in _empty_queue
  File "/Users/ogrisel/code/Rpool/backend/reusable_pool.py", line 299 in _help_stuff_finish
  File "/Users/ogrisel/code/Rpool/backend/reusable_pool.py", line 249 in _terminate_pool
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/util.py", line 185 in __call__
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 496 in terminate
  File "/Users/ogrisel/code/Rpool/backend/reusable_pool.py", line 118 in terminate
  File "/Users/ogrisel/code/Rpool/tests/test_rpool.py", line 265 in test_deadlock_kill

Code cleanup

  • rename pool to executor whenever it makes sense
  • reorganize source folder to emphasize what is backported code and what is specific to loky
  • fix pyflakes error
  • import public API in top level package namespace

Random deadlock in test_in_callback_submit_with_crash[func15-args15-UnpicklingError] at executor shutdown during test teardown

Observed on Python 2.7 (32 bit) under Windows (on appveyor):

tests/test_reusable_executor.py::TestExecutorDeadLock::test_in_callback_submit_with_crash[func15-args15-UnpicklingError] PASSED
tests/test_reusable_executor.py::TestExecutorDeadLock::test_callback_crash_on_submit PASSED
tests/test_reusable_executor.py::TestExecutorDeadLock::test_deadlock_kill PASSED
tests/test_reusable_executor.py::TestExecutorDeadLock::test_crash_races[1] PASSED
tests/test_reusable_executor.py::TestExecutorDeadLock::test_crash_races[2] PASSED
+++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Captured stderr ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[DEBUG:MainProcess:QueueManager] queue management thread shutting down
[DEBUG:MainProcess:QueueManager] closing call_queue
[DEBUG:MainProcess:QueueManager] telling queue thread to quit
[DEBUG:MainProcess:QueueManager] joining processes
[DEBUG:MainProcess:MainThread] Creating a new executor with max_workers=2 as the previous instance cannot be reused (broken).
[DEBUG:MainProcess:QueueFeederThread] feeder thread got sentinel -- exiting
[DEBUG:MainProcess:QueueManager] queue management thread clean shutdown of worker processes: []
[DEBUG:MainProcess:MainThread] shutting down executor <loky.reusable_executor.ReusablePoolExecutor object at 0x03E377B0>
[DEBUG:MainProcess:ThreadManager] shutting down
~~~~~~~~~~~~~~~~~~~~~~~~~~ Stack of Thread-29 (2760) ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  File "C:\Python27\Lib\threading.py", line 774, in __bootstrap
    self.__bootstrap_inner()
  File "C:\Python27\Lib\threading.py", line 801, in __bootstrap_inner
    self.run()
  File "C:\Python27\Lib\threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:\Python27\Lib\multiprocessing\reduction.py", line 124, in _serve
    conn = _listener.accept()
  File "C:\Python27\Lib\multiprocessing\connection.py", line 145, in accept
    c = self._listener.accept()
  File "C:\Python27\Lib\multiprocessing\connection.py", line 365, in accept
    win32.ConnectNamedPipe(handle, win32.NULL)
~~~~~~~~~~~~~~~~~~~~~~~~~~ Stack of MainThread (2136) ~~~~~~~~~~~~~~~~~~~~~~~~~~
  File "C:\Python27\Lib\runpy.py", line 174, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "C:\Python27\Lib\runpy.py", line 72, in _run_code
    exec code in run_globals
  File "C:\projects\loky\.tox\py27\Scripts\py.test.EXE\__main__.py", line 9, in <module>
    sys.exit(main())
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\config.py", line 58, in main
    return config.hook.pytest_cmdline_main(config=config)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\main.py", line 139, in pytest_cmdline_main
    return wrap_session(config, _main)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\main.py", line 110, in wrap_session
    session.exitstatus = doit(config, session) or 0
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\main.py", line 146, in _main
    config.hook.pytest_runtestloop(session=session)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\main.py", line 169, in pytest_runtestloop
    item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 67, in pytest_runtest_protocol
    runtestprotocol(item, nextitem=nextitem)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 83, in runtestprotocol
    nextitem=nextitem))
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 157, in call_and_report
    call = call_runtest_hook(item, when, **kwds)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 177, in call_runtest_hook
    return CallInfo(lambda: ihook(item=item, **kwds), when=when)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 191, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 177, in <lambda>
    return CallInfo(lambda: ihook(item=item, **kwds), when=when)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 125, in pytest_runtest_teardown
    item.session._setupstate.teardown_exact(item, nextitem)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 474, in teardown_exact
    self._teardown_towards(needed_collectors)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 480, in _teardown_towards
    self._pop_and_teardown()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 440, in _pop_and_teardown
    self._teardown_with_finalization(colitem)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 458, in _teardown_with_finalization
    self._callfinalizers(colitem)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 448, in _callfinalizers
    fin()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\python.py", line 465, in <lambda>
    return lambda: result(param_obj)
  File "C:\projects\loky\tests\_executor_mixin.py", line 181, in teardown_method
    executor = get_reusable_executor(max_workers=2)
  File "C:\projects\loky\loky\reusable_executor.py", line 106, in get_reusable_executor
    executor.shutdown(wait=True, kill_workers=kill_workers)
  File "C:\projects\loky\loky\process_executor.py", line 978, in shutdown
    self._queue_management_thread.join()
  File "C:\Python27\Lib\threading.py", line 940, in join
    self.__block.wait()
  File "C:\Python27\Lib\threading.py", line 340, in wait
    waiter.acquire()

Leaking semaphores on AWS Lambda

When using loky on AWS Lambda, I get error messages about leaked semaphores:

loky/backend/semaphore_tracker.py:158: UserWarning: semaphore_tracker: There appear to be 6 leaked semaphores to clean up at shutdown
len(cache))
loky/backend/semaphore_tracker.py:174: UserWarning: semaphore_tracker: '/loky-1-Au1xgY': [Errno 38] ENOSYS
warnings.warn('semaphore_tracker: %r: %s' % (name, e))
loky/backend/semaphore_tracker.py:174: UserWarning: semaphore_tracker: '/loky-1-Gzh15f': [Errno 38] ENOSYS
warnings.warn('semaphore_tracker: %r: %s' % (name, e))
loky/backend/semaphore_tracker.py:174: UserWarning: semaphore_tracker: '/loky-1-Tz7Xrd': [Errno 38] ENOSYS
warnings.warn('semaphore_tracker: %r: %s' % (name, e))
loky/backend/semaphore_tracker.py:174: UserWarning: semaphore_tracker: '/loky-1-zwUVYi': [Errno 38] ENOSYS
warnings.warn('semaphore_tracker: %r: %s' % (name, e))
loky/backend/semaphore_tracker.py:174: UserWarning: semaphore_tracker: '/loky-1-uODqzO': [Errno 38] ENOSYS
warnings.warn('semaphore_tracker: %r: %s' % (name, e))
loky/backend/semaphore_tracker.py:174: UserWarning: semaphore_tracker: '/loky-1-TfXo55': [Errno 38] ENOSYS
warnings.warn('semaphore_tracker: %r: %s' % (name, e))

It's probably that this is not supported on AWS Lambda, just as multiprocessing fails there. But even if it's not fixable, perhaps it would be possible to detect the incompatible platform, already in get_reusable_executor, or executor.map and give a good error message?

Random test failure on travis

=================================== FAILURES ===================================

________________________ TestLokyBackend.test_terminate ________________________

self = <tests.test_loky_backend.TestLokyBackend instance at 0x7fcb4baf59e0>

    def test_terminate(self):

    

        p = self.Process(target=self._test_terminate)

        p.daemon = True

        p.start()

    

        assert p.is_alive()

        assert p in self.active_children()

        assert p.exitcode is None

    

        join = TimingWrapper(p.join)

    

        assert join(0) is None

        self.assertTimingAlmostEqual(join.elapsed, 0.0)

        assert p.is_alive()

    

        assert join(-1) is None

        self.assertTimingAlmostEqual(join.elapsed, 0.0)

        assert p.is_alive()

    

        # XXX maybe terminating too soon causes the problems on Gentoo...

        time.sleep(1)

    

        p.terminate()

    

        if hasattr(signal, 'alarm'):

            # On the Gentoo buildbot waitpid() often seems to block forever.

            # We use alarm() to interrupt it if it blocks for too long.

            def handler(*args):

                raise RuntimeError('join took too long: %s' % p)

            old_handler = signal.signal(signal.SIGALRM, handler)

            try:

                signal.alarm(10)

                assert join() is None

            finally:

                signal.alarm(0)

                signal.signal(signal.SIGALRM, old_handler)

        else:

            assert join() is None

    

>       self.assertTimingAlmostEqual(join.elapsed, 0.0)

tests/test_loky_backend.py:148: 

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

t = 3.901387929916382, g = 0.0

    @staticmethod

    def assertTimingAlmostEqual(t, g):

>       assert round(t-g, 1) == 0

E       assert 3.9 == 0

E        +  where 3.9 = round((3.901387929916382 - 0.0), 1)

tests/test_loky_backend.py:233: AssertionError

Port robust ProcessPoolExecutor to `concurrent.futures`

Identify and port robustification to python upstream concurrent.futures.ProcessPoolExecutor.
See this repo.


Robustifications

  • Create an issue on the python tracker. The PR should contain multiple standalone commits to allow easy discussion with the developpers, and permit cherry picking what make sense for upstream and what to discard as unnecessary changes.

    • Add context concept for concurrent.futures.ProcessPoolExecutor to allow using forkserver and spawn as starting methods for the workers. tomMoral/cpython@185c35d
    • Port the wakeup _Sentinel : this avoids deadlocks if a worker dies with the lock on result_queue. tomMoral/cpython@51102e7
    • Port thread_manager : permits to detect the failure of the queue_manager_thread. tomMoral/cpython@2e0f811
    • Port robust shutdown : enhance the shutdown process to make sure the correct flags are used. tomMoral/cpython@47c39f1
    • Add worker timeout/max_tasks ?

Tests

  • Add crash test to tests/test_concurrent_futures.py to validate our code in each commits

Different error handeling across build versions

The error handling in _handle_tasks is different accross the plateform/version/build. The errors are catched on 3.4.4+ but not on 3.4.2 for instance.
This should be fixed as we need robust behaviours for our usages.
To fix it, the simplest way is probably to overload all the thread handler:
_handle_task, _handle_worker & _handle_result.
For this, we need to re write the __init__ function and the worker function.
Does this make sense?

Random deadlock in current master ()

Current thread 0x00007fff7b73f000 (most recent call first):
  File "/Users/ogrisel/code/Rpool/backend/reusable_pool.py", line 312 in _empty_queue
  File "/Users/ogrisel/code/Rpool/backend/reusable_pool.py", line 298 in _help_stuff_finish
  File "/Users/ogrisel/code/Rpool/backend/reusable_pool.py", line 249 in _terminate_pool
  File "/usr/local/Cellar/python/2.7.8_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/util.py", line 207 in __call__
  File "/usr/local/Cellar/python/2.7.8_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 456 in terminate
  File "/Users/ogrisel/code/Rpool/backend/reusable_pool.py", line 118 in terminate
...

No other thread is running.

Random test failure on current master

====================================================================== FAILURES ======================================================================
____________________________________________________________ test_kill_after_resize_call _____________________________________________________________

exit_on_deadlock = None

    def test_kill_after_resize_call(exit_on_deadlock):
        """Test recovery if killed after resize call"""
        # Test the pool resizing called before a kill arrive
        pool = get_reusable_pool(processes=2)
        pool.apply_async(kill_friend, (pool._pool[1].pid, .1))
        pool = get_reusable_pool(processes=1)
        assert pool.apply(sleep_identity, ((1, 0.),)) == 1
>       pool.terminate()

tests/test_rpool.py:246:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
backend/reusable_pool.py:118: in terminate
    super(_ReusablePool, self).terminate()
/usr/local/Cellar/python/2.7.8_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py:456: in terminate
    self._terminate()
/usr/local/Cellar/python/2.7.8_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/util.py:207: in __call__
    res = self._callback(*self._args, **self._kwargs)
backend/reusable_pool.py:249: in _terminate_pool
    cls._help_stuff_finish(inqueue, outqueue, task_handler, len(pool))
backend/reusable_pool.py:299: in _help_stuff_finish
    _ReusablePool._empty_queue(outqueue, task_handler)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

queue = <multiprocessing.queues.SimpleQueue object at 0x109e80a10>, task_handler = <Thread(TaskHandler-18, stopped daemon 123145314922496)>

    @staticmethod
    def _empty_queue(queue, task_handler):
        """Empty a communication queue to ensure that maintainer threads will
            not hang forever.
            """
        # We use a timeout to detect queue that was locked by a dead
        # process and therefor will never be unlocked.
        if not queue._rlock.acquire(timeout=.1):
            mp.util.debug("queue is locked when terminating. "
                          "The pool might have crashed.")
        while task_handler.is_alive() and queue._reader.poll():
>           queue._reader.recv_bytes()
E           IOError: [Errno 22] Invalid argument

backend/reusable_pool.py:312: IOError

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.