Git Product home page Git Product logo

dask-jobqueue's Introduction

Dask

Build Status Coverage status Documentation Status Discuss Dask-related things and ask for help Version Status NumFOCUS

Dask is a flexible parallel computing library for analytics. See documentation for more information.

LICENSE

New BSD. See License File.

dask-jobqueue's People

Contributors

andersy005 avatar basnijholt avatar bfis avatar costrouc avatar dyoussef avatar ericmjl avatar guillaumeeb avatar jacobtomlinson avatar jakirkham avatar jgerardsimcock avatar jolange avatar josephhardinee avatar jrbourbeau avatar kaelancotter avatar leej3 avatar lesteve avatar lnaden avatar louisabraham avatar lpsinger avatar matyasselmeci avatar mrocklin avatar riedel avatar spencerkclark avatar stuarteberg avatar twoertwein avatar vsoch avatar wgustafson avatar ychiat35 avatar zaccharieramzi avatar zonca avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dask-jobqueue's Issues

Reorganize the dask-jobqueue config file

It is not unheard of to have multiple job queue systems running from the machine. For example, the machine I use the most, Cheyenne, uses both PBS and Slurm. For cases like this, it would be nice if we revamped our config file stricture a bit.

Currently we have:

jobqueue:
  name: dask-worker
  threads: 2
  processes: 4
  memory: 8GB
  interface: null
  death-timeout: 60
  local-directory: null
  extra: ""
  env-extra: []

  queue: null
  project: null
  walltime: '00:30:00'

  pbs:
    resource-spec: null
    job-extra: []

  sge:
    resource-spec: null

  slurm:
    job-cpu: null
    job-mem: null
    job-extra: {}

I'd like to propose that we move some of the jobqueue values into the pbs/sge/slurm catagories:

jobqueue:
  # dask options
  name: dask-worker
  death-timeout: 60

  pbs:
    threads: 2
    processes: 4
    memory: 8GB
    interface: null
    local-directory: null
    extra: ""
    env-extra: []
    queue: null
    project: null
    walltime: '00:30:00'
    resource-spec: null
    job-extra: []

  sge:
    threads: 2
    processes: 4
    memory: 8GB
    interface: null
    local-directory: null
    extra: ""
    env-extra: []
    queue: null
    project: null
    walltime: '00:30:00'
    resource-spec: null

  slurm:
    threads: 2
    processes: 4
    memory: 8GB
    interface: null
    local-directory: null
    extra: ""
    env-extra: []
    queue: null
    project: null
    walltime: '00:30:00'

    job-cpu: null
    job-mem: null
    job-extra: {}

expose Adaptive keyword arguments in adapt method

Our current adapt method does not allow for any keyword arguments. It would be nice if we could pass in arguments like minimum and maximum to the Adaptive constructor with a usage like:

cluster = MyQueueCluster(...)
cluster.adapt(minimum=1, maximum=20)

pip install is not working

I was doing some python 2 tests and tried to pip install dask-jobqueue with the recently released version. That did not work:

$ pip install dask-jobqueue
Collecting dask-jobqueue
  Downloading https://files.pythonhosted.org/packages/01/0b/daa1abb1b3244e788acb94586b0b9a35953f62d6f2b1ccfb041a97a53c99/dask-jobqueue-0.1.0.tar.gz
    Complete output from command python setup.py egg_info:
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "/tmp/pip-build-adBjL5/dask-jobqueue/setup.py", line 8, in <module>
        with open('requirements.txt') as f:
    IOError: [Errno 2] No such file or directory: 'requirements.txt'
    
    ----------------------------------------
Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-build-adBjL5/dask-jobqueue/

CI build problem on SLURM test

There seems to be a problem with SLURM CI test, I've seen this in several PRs : https://travis-ci.org/dask/dask-jobqueue/jobs/387815778

A job seems to be fail at submission.

============================= test session starts ==============================
platform linux -- Python 3.6.5, pytest-3.6.0, py-1.5.3, pluggy-0.6.0 -- /opt/anaconda/bin/python
cachedir: .pytest_cache
rootdir: /dask-jobqueue, inifile:
collected 11 items                                                             
dask_jobqueue/tests/test_jobqueue_core.py::test_jq_core_placeholder PASSED [  9%]
dask_jobqueue/tests/test_jobqueue_core.py::test_errors PASSED            [ 18%]
dask_jobqueue/tests/test_pbs.py::test_header PASSED                      [ 27%]
dask_jobqueue/tests/test_pbs.py::test_job_script PASSED                  [ 36%]
dask_jobqueue/tests/test_pbs.py::test_basic SKIPPED                      [ 45%]
dask_jobqueue/tests/test_pbs.py::test_adaptive SKIPPED                   [ 54%]
dask_jobqueue/tests/test_sge.py::test_basic SKIPPED                      [ 63%]
dask_jobqueue/tests/test_slurm.py::test_header PASSED                    [ 72%]
dask_jobqueue/tests/test_slurm.py::test_job_script PASSED                [ 81%]
dask_jobqueue/tests/test_slurm.py::test_basic FAILED                     [ 90%]
dask_jobqueue/tests/test_slurm.py::test_adaptive PASSED                  [100%]
=================================== FAILURES ===================================
__________________________________ test_basic __________________________________
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x7f934bd42390>
    @pytest.mark.env("slurm")  # noqa: F811
    def test_basic(loop):
        with SLURMCluster(walltime='00:02:00', threads=2, processes=1, memory='4GB',
                          job_extra=['-D /'], loop=loop) as cluster:
            with Client(cluster) as client:
                workers = cluster.start_workers(2)
                future = client.submit(lambda x: x + 1, 10)
                assert future.result(60) == 11
                assert cluster.jobs
    
                info = client.scheduler_info()
                w = list(info['workers'].values())[0]
                assert w['memory_limit'] == 4e9
                assert w['ncores'] == 2
    
                cluster.stop_workers(workers)
    
                start = time()
                while len(client.scheduler_info()['workers']) > 0:
                    sleep(0.100)
>                   assert time() < start + 10
E                   assert 1528123387.6426318 < (1528123377.581219 + 10)
E                    +  where 1528123387.6426318 = time()
dask_jobqueue/tests/test_slurm.py:104: AssertionError
------------------------------ Captured log call -------------------------------
core.py                    210 ERROR    sbatch: error: Batch job submission failed: Unable to contact slurm controller (connect failure)
core.py                    210 ERROR    scancel: error: Invalid job id
================ 1 failed, 7 passed, 3 skipped in 36.07 seconds ================

Refactor SLURMCluster : use memory and thread kwargs, and dynamicaly create the job_script

First, there is something weird with the SLURMCluster template : only processes are used as a limit, thread number are not taken into account. Moreover, there is no memory limit given.

Secondly, SLURMCluster will need to be updated to match the PBSCluster improvements from #10 (once it is merge): Mainly a more dynamic construct of the submit script. This should prevent some problems as noted in #7, #19 and maybe #20.

Allow global access to dashboard by default

When using interfaces like infiniband it's often the case that one can't get the dashboard to show up. This is because the bokeh server is expecting to be called by its IP address on the infiniband network, not the normal network.

One solution to this would be to provide services={('bokeh', ('', 8787)): BokehScheduler} to LocalCluster.

Failing with SGE

I would like to try using dask-jobqueue to set up a Dask cluster on our internal Grid Engine cluster (I believe it is UGE, not SGE, but they should be siblings of each other).

My script is below:

# dask_sge_test.py
from dask_jobqueue.sge import SGECluster
cluster = SGECluster(queue='regular')

The error is below:

$ python dask_sge_test.py
Traceback (most recent call last):
  File "dask_sge_test.py", line 1, in <module>
    from dask_jobqueue.sge import SGECluster
  File "/home/maer3/anaconda/envs/tm-sleep/lib/python3.6/site-packages/dask_jobqueue/__init__.py", line 2, in <module>
    from .core import JobQueueCluster
  File "/home/maer3/anaconda/envs/tm-sleep/lib/python3.6/site-packages/dask_jobqueue/core.py", line 11, in <module>
    from distributed.deploy import Cluster
ImportError: cannot import name 'Cluster'

I did some checking in an IPython session:

In [9]: import distributed

In [10]: 'Cluster' in distributed.deploy.__dict__.keys()
Out[10]: False

Not sure why this is happening, I dug into the distributed Python module, and found everything set up correctly: under deploy/__init__.py, I see from .cluster import Cluster, so the name is available.

RuntimeWarning: Couldn't detect a suitable IP address

I am starting a cluster like this:

cluster = PBSCluster(processes=5, threads=4, memory="0.2",
                     interface="ib0",
                     group_list='g26209',
                     resource_spec='select=1:ncpus=20:mpiprocs=5:model=ivy',
                     walltime='06:00:00')

I receive the following warning:

/nobackup/rpaberna/conda/envs/pangeo/lib/python3.6/site-packages/distributed/utils.py:128: RuntimeWarning: Couldn't detect a suitable IP address for reaching '8.8.8.8', defaulting to '127.0.0.1': [Errno 101] Network is unreachable
  % (host, default, e), RuntimeWarning)

Do I need to be concerned about this?

Unified test suite

After we figure out CI testing in #2 we might consider refactoring the test suite into a unified TestCase class for common functionality.

Dask client connects to SLURM workers, then rapidly loses them

Summary

When adding workers to a SLURM dask client, workers are added as resources are provisioned by the scheduler, but then they quickly disappear. Presumably they are killed by the client because a lack of connection (--death_timeout flag). Its not clear whether this is intended behavior. My goal is to add workers to a dask client, connect to that client from my local laptop using jupyter lab. By the time I ssh tunnel in from my laptop, all the workers are killed.

(pangeo) [b.weinstein@c30b-s1 ~]$ python
Python 3.6.4 | packaged by conda-forge | (default, Dec 23 2017, 16:31:06) 
[GCC 4.8.2 20140120 (Red Hat 4.8.2-15)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from dask_jobqueue import SLURMCluster
>>> from datetime import datetime
>>> from time import sleep
>>> 
>>> cluster = SLURMCluster(project='ewhite',death_timeout=200)
>>> cluster.start_workers(5)
[3, 5, 7, 9, 11]
>>> 
>>> from dask.distributed import Client
>>> client = Client(cluster)
>>> 
>>> while True:
...     print(datetime.now().strftime("%a, %d %B %Y %I:%M:%S"))
...     print(client)
...     sleep(30)
... 
Wed, 21 March 2018 10:57:19
<Client: scheduler='tcp://172.16.194.66:35459' processes=0 cores=0>
Wed, 21 March 2018 10:57:49
<Client: scheduler='tcp://172.16.194.66:35459' processes=6 cores=24>
Wed, 21 March 2018 10:58:19
<Client: scheduler='tcp://172.16.194.66:35459' processes=6 cores=24>
Wed, 21 March 2018 10:58:49
<Client: scheduler='tcp://172.16.194.66:35459' processes=6 cores=24>
Wed, 21 March 2018 10:59:19
<Client: scheduler='tcp://172.16.194.66:35459' processes=6 cores=24>
Wed, 21 March 2018 10:59:49
<Client: scheduler='tcp://172.16.194.66:35459' processes=6 cores=24>
Wed, 21 March 2018 11:00:20
<Client: scheduler='tcp://172.16.194.66:35459' processes=6 cores=24>
Wed, 21 March 2018 11:00:50
<Client: scheduler='tcp://172.16.194.66:35459' processes=5 cores=20>
Wed, 21 March 2018 11:01:20
<Client: scheduler='tcp://172.16.194.66:35459' processes=5 cores=20>
Wed, 21 March 2018 11:01:50
<Client: scheduler='tcp://172.16.194.66:35459' processes=0 cores=0>
Wed, 21 March 2018 11:02:20
<Client: scheduler='tcp://172.16.194.66:35459' processes=0 cores=0>
Wed, 21 March 2018 11:02:50
<Client: scheduler='tcp://172.16.194.66:35459' processes=0 cores=0>

Expected Behavior

Following this helpful screencast, I thought that once workers were added, they would remain available for computation. Either the client is very aggressive about pruning unused workers, or something else is wrong.

Comments

  • May be relevant, but unknown. Once I quit the above python session, I am greeted with several error messages saying the TCP stream is closed. Example
Traceback (most recent call last):
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/comm/tcp.py", line 200, in read
    convert_stream_closed_error(self, e)
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/comm/tcp.py", line 128, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed

I can confirm that the workers that were once there, are now gone.

(pangeo) [b.weinstein@c30b-s1 ~]$ squeue -u b.weinstein
             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
          18361726 hpg2-comp     bash b.weinst  R      22:14      1 c30b-s1

presumably killed by the client.

Edited desk.err file produced, with many hundreds of duplicate lines removed.

(pangeo) [b.weinstein@c30b-s1 ~]$ cat dask.err 
distributed.nanny - INFO -         Start Nanny at: 'tcp://172.16.194.178:36916'
distributed.nanny - INFO -         Start Nanny at: 'tcp://172.16.194.178:38675'
distributed.nanny - INFO -         Start Nanny at: 'tcp://172.16.194.178:32914'
distributed.nanny - INFO -         Start Nanny at: 'tcp://172.16.194.178:44959'
distributed.nanny - INFO -         Start Nanny at: 'tcp://172.16.194.178:44970'
distributed.nanny - INFO -         Start Nanny at: 'tcp://172.16.194.178:36426'
distributed.nanny - INFO -         Start Nanny at: 'tcp://172.16.194.178:35157'
distributed.nanny - INFO -         Start Nanny at: 'tcp://172.16.194.178:40539'
distributed.diskutils - WARNING - Found stale lock file and directory '/home/b.weinstein/dask-worker-space/worker-r9gleghg', purging
distributed.worker - INFO -       Start worker at: tcp://172.16.194.178:39384
distributed.worker - INFO -          Listening to: tcp://172.16.194.178:39384
distributed.worker - INFO -              nanny at:       172.16.194.178:44959
distributed.worker - INFO -              bokeh at:        172.16.194.178:8789
distributed.worker - INFO - Waiting to connect to:  tcp://172.16.194.66:35459
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                    7.00 GB
distributed.worker - INFO -       Local Directory: /home/b.weinstein/dask-worker-space/worker-l7jp7he1
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:  tcp://172.16.194.66:35459
distributed.worker - INFO - -------------------------------------------------
distributed.nanny - INFO - Failed to start worker process.  Restarting
...
distributed.nanny - INFO - Failed to start worker process.  Restarting
distributed.nanny - INFO - Failed to start worker process.  Restarting
distributed.worker - INFO -       Start worker at: tcp://172.16.194.178:46551
distributed.worker - INFO -          Listening to: tcp://172.16.194.178:46551
distributed.worker - INFO -              nanny at:       172.16.194.178:38675
distributed.worker - INFO -              bokeh at:       172.16.194.178:41314
distributed.worker - INFO - Waiting to connect to:  tcp://172.16.194.66:35459
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                    7.00 GB
distributed.worker - INFO -       Local Directory: /home/b.weinstein/dask-worker-space/worker-yjnhwx2e
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:  tcp://172.16.194.66:35459
distributed.worker - INFO - -------------------------------------------------
...
distributed.nanny - INFO - Closing Nanny at 'tcp://172.16.192.163:44012'
...
:35125
distributed.worker - INFO -          Listening to: tcp://172.16.194.178:35125
distributed.worker - INFO -              nanny at:       172.16.194.178:32914
distributed.worker - INFO -              bokeh at:       172.16.194.178:45869
distributed.worker - INFO - Waiting to connect to:  tcp://172.16.194.66:35459
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                    7.00 GB
distributed.worker - INFO -       Local Directory: /home/b.weinstein/dask-worker-space/worker-w9w338nb
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Stopping worker at tcp://172.16.194.178:35125
distributed.worker - INFO -       Start worker at: tcp://172.16.194.184:46207
distributed.worker - INFO -          Listening to: tcp://172.16.194.184:46207
distributed.worker - INFO -              nanny at:       172.16.194.184:45711
distributed.worker - INFO -              bokeh at:        172.16.194.184:8789
distributed.worker - INFO - Waiting to connect to:  tcp://172.16.194.66:35459
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                    7.00 GB
distributed.worker - INFO -       Local Directory: /home/b.weinstein/dask-worker-space/worker-g0jjm93z
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Stopping worker at tcp://172.16.194.184:46207
orker - INFO -              nanny at:       172.16.194.178:38675
distributed.worker - INFO -              bokeh at:       172.16.194.178:45033
distributed.worker - INFO - Waiting to connect to:  tcp://172.16.194.66:35459
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                    7.00 GB
distributed.worker - INFO -       Local Directory: /home/b.weinstein/dask-worker-space/worker-u2_1pu5v
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Stopping worker at tcp://172.16.194.178:35568
tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 849, in callback
    result_list.append(f.result())
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/nanny.py", line 155, in _start
    response = yield self.instantiate()
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/nanny.py", line 223, in instantiate
    self.process.start()
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/nanny.py", line 363, in start
    self._wait_until_started())
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/nanny.py", line 471, in _wait_until_started
    assert msg == 'started', msg
AssertionError: {'address': 'tcp://172.16.194.178:37068', 'dir': '/home/b.weinstein/dask-worker-space/worker-5ipv17oi'}
distributed.dask_worker - INFO - End worker
distributed.nanny - INFO - Failed to start worker process.  Restarting
/envs/pangeo/bin/dask-worker", line 6, in <module>
    sys.exit(distributed.cli.dask_worker.go())
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/cli/dask_worker.py", line 252, in go
    main()
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/click/core.py", line 722, in __call__
    return self.main(*args, **kwargs)
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/click/core.py", line 697, in main
    rv = self.invoke(ctx)
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/click/core.py", line 895, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/click/core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/cli/dask_worker.py", line 243, in main
    loop.run_sync(run)
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/ioloop.py", line 582, in run_sync
    return future_cell[0].result()
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/cli/dask_worker.py", line 236, in run
    yield [n._start(addr) for n in nannies]
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 849, in callback
    result_list.append(f.result())
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/nanny.py", line 155, in _start
    response = yield self.instantiate()
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/nanny.py", line 223, in instantiate
    self.process.start()
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/nanny.py", line 363, in start
    self._wait_until_started())
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "/home/b.weinstein/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/nanny.py", line 471, in _wait_until_started
    assert msg == 'started', msg
AssertionError: {'address': 'tcp://172.16.194.178:37661', 'dir': '/home/b.weinstein/dask-worker-space/worker-_b3fhcyu'}
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-25, started daemon)>
...
distributed.worker - INFO -          Listening to: tcp://172.16.194.184:43288
distributed.worker - INFO -              nanny at:       172.16.194.184:43216
distributed.worker - INFO -              bokeh at:        172.16.194.184:8789
distributed.worker - INFO - Waiting to connect to:  tcp://172.16.194.66:35459
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                    7.00 GB
distributed.worker - INFO -       Local Directory: /home/b.weinstein/dask-worker-space/worker-mo6fnfvo
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Stopping worker at tcp://172.16.194.184:43288
distributed.nanny - INFO - Failed to start worker process.  Restarting
...
:36157
distributed.worker - INFO -          Listening to: tcp://172.16.194.184:36157
distributed.worker - INFO -              nanny at:       172.16.194.184:42948
distributed.worker - INFO -              bokeh at:        172.16.194.184:8789
distributed.worker - INFO - Waiting to connect to:  tcp://172.16.194.66:35459
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                    7.00 GB
distributed.worker - INFO -       Local Directory: /home/b.weinstein/dask-worker-space/worker-h0jj6gi3
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Stopping worker at tcp://172.16.194.184:36157
distributed.nanny - INFO - Failed to start worker process.  Restarting
...
distributed.worker - INFO -          Listening to: tcp://172.16.194.184:46197
distributed.worker - INFO -              nanny at:       172.16.194.184:38837
distributed.worker - INFO -              bokeh at:        172.16.194.184:8789
distributed.worker - INFO - Waiting to connect to:  tcp://172.16.194.66:35459
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                    7.00 GB
distributed.worker - INFO -       Local Directory: /home/b.weinstein/dask-worker-space/worker-lft5bgwk
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Stopping worker at tcp://172.16.194.184:46197
...
distributed.nanny - WARNING - Worker process 27618 was killed by unknown signal
...
distributed.dask_worker - INFO - End worker
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-392, 
...
distributed.nanny - WARNING - Worker process still alive after 159 seconds, killing
distributed.nanny - WARNING - Worker process 13695 was killed by unknown signal
distributed.nanny - WARNING - Worker process still alive after 159 seconds, killing
...
distributed.nanny - WARNING - Worker process 13712 was killed by unknown signal
...
distributed.nanny - WARNING - Worker process still alive after 159 seconds, killing
distributed.dask_worker - INFO - End worker
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-303, started daemon)>
(pangeo) [b.weinstein@c30b-s1 ~]$ 

stop_workers() not working with SLURMCluster

stop workers is not working for me on a SLURM cluster. Looking at the SLURM status I can still see the jobs running, and they are still available in the scheduler after running stop_workers, even after a few minutes.

In [1]: from dask_jobqueue import SLURMCluster

In [2]: cluster = SLURMCluster(processes=1, threads=1, memory='7GB', walltime='00:20:00')

In [3]: from dask.distributed import Client

In [4]: client  = Client(cluster)

In [5]: workers = cluster.start_workers(2)

In [6]: 

In [6]: cluster.stop_workers(workers)

In [7]: client.scheduler_info()['workers']
Out[7]: 
{'tcp://172.16.193.150:34830': {'name': 'dask-worker-3',
  'memory_limit': 7000000000,
  'host': '172.16.193.150',
  'resources': {},
  'ncores': 1,
  'services': {'nanny': 45411, 'bokeh': 8789},
  'local_directory': '/home/shawntaylor/projects/dask-jobqueue/dask-worker-space/worker-fcck4f5r',
  'pid': 31568,
  'cpu': 2.0,
  'memory': 67280896,
  'time': 1524318570.0637333,
  'read_bytes': 2777.9680047420147,
  'write_bytes': 2062.4913531609777,
  'num_fds': 22,
  'executing': 0,
  'in_memory': 0,
  'ready': 0,
  'in_flight': 0},
 'tcp://172.16.193.150:41714': {'name': 'dask-worker-2',
  'memory_limit': 7000000000,
  'host': '172.16.193.150',
  'resources': {},
  'ncores': 1,
  'services': {'nanny': 43241, 'bokeh': 42670},
  'local_directory': '/home/shawntaylor/projects/dask-jobqueue/dask-worker-space/worker-857lfd0u',
  'pid': 31570,
  'cpu': 0.0,
  'memory': 67092480,
  'time': 1524318569.739144,
  'read_bytes': 2542.885851061595,
  'write_bytes': 2066.345038028005,
  'num_fds': 22,
  'executing': 0,
  'in_memory': 0,
  'ready': 0,
  'in_flight': 0}}

In [8]: 

I'm pretty sure it's because stop_workers is calling self.cancel_command (set to scancel for SLURM) and expecting to use the slurm job id, which is saved here:

In [30]: cluster.jobs
Out[30]: {4: 'Submitted batch job 19566158', 5: 'Submitted batch job 19566159'}

and using the entire string with scancel instead of just the job id. This gets set here

job = out.decode().split('.')[0].strip()

That's probably made for another schedulers job submission output format? I can fix this for SLURM but I don't know how I would generalize that to fit in the package. If someone can point me in the right direction I can give it a try though and submit a PR.

Change naming conventions for cores and memory

Currently cluster objects in this project take in keywords like the following:

cluster = FooCluster(threads=4, processes=8, memory="16GB")

Where threads means threads-per-process and memory means memory-per-process.

However, I suspect that changing the meaning of these terms to the following might be more intuitive for users and administrators:

cluster = FooCluster(cores=32, processes=8, memory="128GB")

This would be a breaking change, but given how small and well connected the current user base is this seems doable.

Not setting `project` should not raise

One suggestion I have is not to raise an exception unless there is exceptional behavior. In the case of not having a project set, not all systems require users to set a project, so we should not enforce that by raising IMO. This would also apply to #6

SLURMCluster - stoping workers

Hi guys,
I am trying to stop workers within my SLURMCluster. As I understand to stop all workers I can pass to the method stop_workers dictionary of workers, i.e. jobs. Then the SLURM command scancel <job_id> is executed and finally dictionary jobs is cleared.
However it is not working as expected because value of mentioned dictionary is an output from sbatch commands and in the end jobs looks like this:

In [45]: cluster.jobs
Out[45]: 
{2: u'Submitted batch job 5343903',
 3: u'Submitted batch job 5343904',
 4: u'Submitted batch job 5343905',
 5: u'Submitted batch job 5343906'}

So when I call stop_workers with jobs as an argument it deletes each key from dictionary but it does not actually stops jobs (because generated command looks like

 scancel Submitted batch job 5344215 Submitted batch job 5344216 Submitted batch job 5344216 Submitted batch job 5344218

instead of

 scancel 5344215 5344216 5344216 5344218

Am I missing something?

Difficulty running tests locally

Has anyone run into this before? Something must be incorrect in my environment. Recommendations welcome.

mrocklin@carbon:~/workspace/dask-jobqueue$ source activate test-jobqueue
mrocklin@carbon:~/workspace/dask-jobqueue$ source ci/sge.sh 
mrocklin@carbon:~/workspace/dask-jobqueue$ jobqueue_before_install 
+ jobqueue_before_install
+ docker version
Client:
 Version:      18.03.1-ce
 API version:  1.37
 Go version:   go1.9.5
 Git commit:   9ee9f40
 Built:        Thu Apr 26 07:17:20 2018
 OS/Arch:      linux/amd64
 Experimental: false
 Orchestrator: swarm

Server:
 Engine:
  Version:      18.03.1-ce
  API version:  1.37 (minimum version 1.12)
  Go version:   go1.9.5
  Git commit:   9ee9f40
  Built:        Thu Apr 26 07:15:30 2018
  OS/Arch:      linux/amd64
  Experimental: false
+ docker-compose version
docker-compose version 1.21.2, build a133471
docker-py version: 3.3.0
CPython version: 3.6.4
OpenSSL version: OpenSSL 1.0.2o  27 Mar 2018
+ cd ./ci/sge
+ ./start-sge.sh
Building master
Step 1/20 : FROM ubuntu:14.04
 ---> b1719e1db756
Step 2/20 : ENV LANG C.UTF-8
 ---> Using cache
 ---> 3c2c536d97b5
Step 3/20 : RUN apt-get update && apt-get install curl bzip2 git gcc -y --fix-missing
 ---> Using cache
 ---> 90136a8b0571
Step 4/20 : RUN curl -o miniconda.sh https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh &&     bash miniconda.sh -f -b -p /opt/anaconda &&     /opt/anaconda/bin/conda clean -tipy &&     rm -f miniconda.sh
 ---> Using cache
 ---> ba549fbf6dd8
Step 5/20 : ENV PATH /opt/anaconda/bin:$PATH
 ---> Using cache
 ---> 58d8773e6e19
Step 6/20 : RUN conda install -n root conda=4.4.11 && conda clean -tipy
 ---> Using cache
 ---> a810a990c07a
Step 7/20 : RUN conda install -c conda-forge dask distributed blas pytest mock ipython pip psutil && conda clean -tipy
 ---> Using cache
 ---> f445166bd4a1
Step 8/20 : RUN pip install --no-cache-dir drmaa
 ---> Using cache
 ---> 12d25180f30e
Step 9/20 : RUN pip install --no-cache-dir git+https://github.com/dask/dask.git --upgrade --no-deps
 ---> Using cache
 ---> 621b6a4a14c6
Step 10/20 : RUN pip install --no-cache-dir git+https://github.com/dask/distributed.git --upgrade --no-deps
 ---> Using cache
 ---> 17ca58e59d18
Step 11/20 : COPY ./*.sh /
 ---> Using cache
 ---> a42e848de456
Step 12/20 : COPY ./*.txt /
 ---> Using cache
 ---> b93a0604a4ee
Step 13/20 : RUN bash ./setup-master.sh
 ---> Running in 3520e6b17575
E: Failed to fetch http://archive.ubuntu.com/ubuntu/pool/main/p/python2.7/libpython2.7-minimal_2.7.6-8ubuntu0.3_amd64.deb  404  Not Found [IP: 91.189.88.162 80]

E: Failed to fetch http://archive.ubuntu.com/ubuntu/pool/main/p/python2.7/python2.7-minimal_2.7.6-8ubuntu0.3_amd64.deb  404  Not Found [IP: 91.189.88.162 80]

E: Failed to fetch http://archive.ubuntu.com/ubuntu/pool/main/p/python2.7/libpython2.7-stdlib_2.7.6-8ubuntu0.3_amd64.deb  404  Not Found [IP: 91.189.88.162 80]

E: Failed to fetch http://archive.ubuntu.com/ubuntu/pool/main/p/python2.7/python2.7_2.7.6-8ubuntu0.3_amd64.deb  404  Not Found [IP: 91.189.88.162 80]

E: Unable to fetch some archives, maybe run apt-get update or try with --fix-missing?
sudo: unknown user: sgeadmin
sudo: unable to initialize policy plugin
gridengine-master: unrecognized service
postfix: unrecognized service
update-rc.d: /etc/init.d/postfix: file does not exist
ERROR: Service 'master' failed to build: The command '/bin/sh -c bash ./setup-master.sh' returned a non-zero code: 1
Error response from daemon: Container d34f83539977d1775472625c21cade3bc8cf1f29d160de255cea3cad95384a7d is not running
Waiting for SGE slots to become available
Error response from daemon: Container d34f83539977d1775472625c21cade3bc8cf1f29d160de255cea3cad95384a7d is not running
Waiting for SGE slots to become available

Stopping worker and stopping whole jobqueue cluster

The question is what is the right way to do this?

Currently, when I want to stop my JobQueue cluster to restart a fresh one (with new parameters for instance), I use

cluster.scheduler.close()

For stopping workers, as the scale method is not currently working, I use something like

cluster.stop_wokers(cluster.jobs)

(I don't remember the exact syntax, and not eager to look for it).

Are these the right things to do?

Should we expose some more explicit methods, for example:

cluster.stop()

and

cluster.stop_all_workers()

Dask client connects to PBS workers, then rapidly loses them

The title is intentionally analogous to #20 as I have the feeling the explanation for the observed behavior is similar.

I'm on a PBS cluster whose nodes are made of 2 cpus with 14 cores each.

I was initially calling:

cluster = PBSCluster(queue='mpi_1', local_directory=local_dir, interface='ib0', walltime='24:00:00',
                     threads=4, processes=7, memory='10GB', resource_spec='select=1:ncpus=28:mem=100g', 
                     death_timeout=100)

This led to the creation of workers but they died after creation.

The following choice seems to fix the issue:

threads=14, processes=2, memory='50GB', 

Here is a link that describes dask workers:
http://distributed.readthedocs.io/en/latest/worker.html
this may be useful to readers having similar issues

Note that the link between cluster architecture and options that can be passed to PBSCluster is still not entirely clear to me.

So my issue seems to be fixed, but I wanted to put this experience visible to people that may encounter similar issues.

Documentation feedback

Here are a few high level thoughts on the current documentation:

Looking at the main example on the main page I'm curious if it is realistic:

from dask_jobqueue import PBSCluster

cluster = PBSCluster(processes=6, threads=4, memory="16GB")
cluster.start_workers(10)

from dask.distributed import Client
client = Client(cluster)

Should we include project, queue, resource specs, and other keywords that might both be necessary for realistic use and also recognizable to users of that kind of system? Similarly I think it would be very useful to include a few real-world examples in the example deployments documentation. I suspect that this was the original intent of that page (nice idea!). Perhaps we can socialize this on the pangeo issue tracker and ask people to submit PRs for their clusters?

History

I recommend that we remove the history section from the main page

Description of how it works

My experience trying to explain these projects to users of HPC systems is that most of them are familiar with job scripts. I wonder if we might include a "How does this work?" section that shows the job script that we generate, and explain that we submit this several times.

Thoughts?

Release

I've been recommending this project to a few people recently and they've appreciated it. It would be nice to make it easier to install. Are there any blockers to putting this on PyPI?

Reusing dask-jobqueue code for launching Apache Spark cluster

Just an idea I had, and feel that it was better to ask here before doing anything.
I believe that the (great) work done here could be applied to Spark as well as Dask. This simple interface with Job Scheduler may be adapted for launching Spark cluster.

Question is, how do you feel about that? Is there some kind of license or property constraint?

failure to autoscale unless workers are already present

I am testing the PBSCluster along with autoscaling. It seems that I am unable to get the cluster to launch any workers without explicitly starting at least one worker. I would expect that this configuration would scale from 0 to 10 (180 processes) without further interaction/configuration.

    cluster = PBSCluster(queue='default',
                         walltime='01:00:00',
                         project='MyAccount',
                         resource_spec='1:ncpus=36:mpiprocs=36:mem=109GB',
                         interface='ib0',
                         threads=4,
                         processes=18)
    client = Client(cluster)
    cluster.adapt(minimum=0, maximum=10)

@mrocklin - this may actually be a problem with the dask adaptive cluster but I wanted to discuss here to see if I am missing something obvious specific to PBS.

Learning from ipyparallel

ipyparallel (formerly part of the Jupyter Notebook) provides similar functionality to dask distributed and dask-jobqueue in that it allows users to startup a cluster and submit work to it. The model of the ipyparallel is a bit different from that of dask distributed, but that doesn't really concern us here.

What is interesting is ipyparallel has a trove of knowledge regarding starting jobs on various common HPC Schedulers. This knowledge is largely baked into one file. For HPC Schedulers already in dask-jobqueue, it's worth comparing notes to ipyparallel and see what can be learned on this front. As to the schedulers not present in dask-jobqueue, it's worth taking a look at ipyparallel's implementations and seeing what can be gleaned from it and how it might be used here. It's probably also worth learning how things have been refactored out in ipyparallel to see if there are any useful strategies for modeling HPC Schedulers generally.

From what environment should I call SLURMCluster? Parsing Issue in sbatch?

I am running on a university cluster with a SLURM submission scheduler. I installed conda and sourced the pangeo environment without error.

I git cloned this repo, opened python and tried

(pangeo) [b.weinstein@gator4 dask-jobqueue]$ python
Python 3.6.4 | packaged by conda-forge | (default, Dec 23 2017, 16:31:06) 
[GCC 4.8.2 20140120 (Red Hat 4.8.2-15)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from dask_jobqueue import SLURMCluster
>>> 
>>> cluster = SLURMCluster(project="ewhite")
>>> cluster.start_workers(10)
sbatch: error: Invalid argument: ewhite

sbatch: error: Invalid argument: ewhite

sbatch: error: Invalid argument: ewhite

sbatch: error: Invalid argument: ewhite

sbatch: error: Invalid argument: ewhite

sbatch: error: Invalid argument: ewhite

sbatch: error: Invalid argument: ewhite

sbatch: error: Invalid argument: ewhite

sbatch: error: Invalid argument: ewhite

sbatch: error: Invalid argument: ewhite

[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

Where ewhite is the group tag (see below).

In previous submission scripts, I would typically start with something like

#!/bin/bash
#SBATCH --job-name=NEON_LIDAR   # Job name
#SBATCH --mail-type=END               
#SBATCH --mail-user=#######@gmail.com
#SBATCH --account=ewhite
#SBATCH --qos=ewhite-b

#SBATCH --ntasks=1                 # Number of MPI ranks
#SBATCH --cpus-per-task=1            # Number of cores per MPI rank
#SBATCH --mem=4000
#SBATCH --time=00:59:00       #Time limit hrs:min:sec
#SBATCH --output=NEON_LIDAR.out   # Standard output and error log
#SBATCH --error=NEON_LIDAR.err

which works without a problem.

Attempts

I changed

#SBATCH -A %(project)s

to

#SBATCH --account %(project)s

to match previous submission scripts. No change.

So I tried removing the line, which yielded

>>> cluster.start_workers(10)
sbatch: error: Invalid argument: 00:30:00

sbatch: error: Invalid argument: 00:30:00

sbatch: error: Invalid argument: 00:30:00

...

This is an error in the walltime argument. This makes me believe that it doesn't have to do with the argument value, but how all the arguments are parsed and passed to bash. The most likely explanation is that i'm calling this script from an incorrect location, and the variable expansion or quotes are not being parsed in the way I anticipate.

not working with python 2

Having installed dask_jobqueue with:
pip install git+https://github.com/dask/dask-jobqueue.git#egg=0.1.0

I get the following error upon import:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/rodgersleejg/anaconda3/envs/dask_py2/lib/python2.7/site-packages/dask_jobqueue/__init__.py", line 3, in <module>
    from .pbs import PBSCluster
  File "/Users/rodgersleejg/anaconda3/envs/dask_py2/lib/python2.7/site-packages/dask_jobqueue/pbs.py", line 11, in <module>
    class PBSCluster(JobQueueCluster):
  File "/Users/rodgersleejg/anaconda3/envs/dask_py2/lib/python2.7/site-packages/docrep/__init__.py", line 376, in replace
    func.__doc__, indent=indent, stacklevel=4)
AttributeError: attribute '__doc__' of 'type' objects is not writable

PBSCluster is broken: 'PBSCluster' object has no attribute 'name'

Probably my mistake in one of the commit on #10:
It seems that name attribute is not initialized in core.py, leading to:

In [6]: cluster = PBSCluster(threads=6, job_extra=['-m ae'], queue='regular')
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-6-88770ff01410> in <module>()
----> 1 cluster = PBSCluster(threads=6, job_extra=['-m ae'], queue='regular')

~/miniconda3/envs/dask_dev/lib/python3.6/site-packages/dask_jobqueue-0.1.0-py3.6.egg/dask_jobqueue/pbs.py in __init__(self, queue, project, resource_spec, walltime, job_extra, **kwargs)
     69 
     70         #PBS header build
---> 71         if self.name is not None:
     72             header_lines = ['#PBS -N %s' % self.name]
     73         if queue is not None:

AttributeError: 'PBSCluster' object has no attribute 'name'

And obviously, self.name is never initialized.

A PR is incoming for fixing this and other issues.
We should set up basic unit tests :)!

Add LSF

It might be valuable to extend this repository with a solution for LSF. My hope is that this is relatively easy for someone with modest LSF experience. Looking at the current solutions for PBS or SLURM might be helpful (they're about 100 lines, mostly docstrings)

Document steps to release dask-jobqueue

I have just made the 0.1.1 release of dask-jobqueue. These are the steps I took:

  • checkout dask/dask-jobqueeue master, for me this was: git checkout upstream/master
  • create a tag: git tag -a 0.1.1 -m '0.1.1'
  • push the tag to github: git push --tags upstream
  • build the wheels/dist for pypi: python setup.py bdist_wheel sdist
  • upload the release to pypi: twine upload dist/dask*0.1.1*

@jakirkham has suggested wrapping these tasks into a rever workflow: #36 (comment). I'm open to going that route but it is not a high priority for me at the moment.

Perhaps someone else wants to step in there. There may be other steps we want in this workflow. I view this as the bare minimum. I think this issue is the place where we should discuss those.

CI for Schedulers

Linking to @jhamman's comment, one of the challenges here is setting up CI for Dask. There has been some already towards using SGE (particularly at dask-drmaa) for testing. Configuring other schedulers (e.g. SLURM, PBS, LSF, etc.) is possible, but can be a bit tricky. Docker helps with this, but it requires a fair bit of configuration to get right.

One resource that might be useful are these Docker images, which include SLURM and TORQUE (similar to PBS). That said, I'm not sure what license applies to these. Also they may not work out of the box (as was my experience with using their Grid Engine some time back). That said, they should at least provide useful insights on how to get working Docker images for these different cases. For simplicity, would propose building and hosting images for respective schedulers so that CI can just download them.

Problem accessing the dashboard when the cluster frontend has an internal and an external IP

Funnily enough I helped solve the problem in #71 but I now realise that the dashboard is not accessible in our SGE cluster and the ssh tunneling work-around I proposed does not cut it.

Even more fun, when I use dask-drmaa, I can access the dashboard fine (a ssh tunnel is needed of course).

Here is my current understanding:

  • I run the scheduler on the cluster frontend called sequoia
  • the cluster frontend has a external IP, which is used for example when I ssh from my local machine to the frontend:
# from my local machine
$ ping sequoia
PING sequoia.paris.inria.fr (128.93.90.8) 56(84) bytes of data.
  • the cluster frontend has an internal IP that is used when I am on the cluster:
# from sequoia i.e. the cluster frontend
$ ping sequoia
PING sequoia.cm.cluster (10.141.255.254) 56(84) bytes of data.
  • the scheduler is using the internal IP address which means that even with ssh tunneling, the dashboard is not going to be reachable. Here is an excerpt of client._repr_html_() for completeness:
Scheduler: </b>tcp://10.141.255.254:57591\n
Dashboard: </b><a href=\'http://10.141.255.254:45961/status\' target=\'_blank\'>http://10.141.255.254:45961/status</a>

In some way it feels like some kind of variation of dask/dask-drmaa#75:

  • in my case (using dask-jobqueue), the internal IP is used by the scheduler so the dashboard is not accessible from the outside.
  • in their case (using dask-drmaa), the external IP was used by the scheduler but this was a problem because the workers can only access the internal IP of the frontend. cc @maxnoe in case I misunderstood something. In particular I would be very interested to see client._repr_html_() on their setup.

Inputs more than welcome:

  • is my cluster setup quite unusual? A bit of a simplification, but changing our cluster setup is not a very likely solution.
  • do we need to have the scheduler use the internal IP and the dashboard the external IP (if that is even is)?
  • other things I may have missed ?

I'd be more than happy to provide more info if that can help!

passing options to submit and cancel commands

Wouldn't it be useful to be able to pass options to submit and cancel commands ?

For example, I would like to stop receiving job deletions emails on a PBS cluster and one way to doing this is to pass -Wsuppress_email=-1 option to qdel

If yes, what would be a good way to do this?

Add Grid Engine

It might be valuable to extend this repository with a solution for Grid Engine. My hope is that this is relatively easy for someone with modest grid engine experience. Looking at the current solutions for PBS or SLURM might be helpful (they're about 100 lines, mostly docstrings)

Release

I'm inclined to do a release to get something out that works with the new Dask release. Any objections?

In particular, should this wait on #63 ? cc @jhamman

Assumption made about location of dask-worker

In

dirname = os.path.dirname(sys.executable)

the assumption is made that the dask-worker script is in the same place as the python executable. This is not true in our case since dask is available through an environment module (so the availability of the script is handled via the $PATH envvar).

Would it be possible to add a check before

self._command_template = os.path.join(
to verify that this default actually exists and, if not, use the location found in $PATH (otherwise die gracefully)? Something like

if not os.path.isfile(os.path.join(dirname, 'dask-worker'):
    if os.path.isfile(os.path.abspath('dask-worker'):
        dirname = os.path.join(os.path.abspath('dask-worker'), '..')
    else:
        # Trigger a graceful death

Implement a cluster status method, to know if workers are really running

Currently, we only know that we have submitted some jobs to a cluster scheduler. We don't know if these jobs are running or queued, or in any other state.

What do you think of implementing a kind of status method?

In the PBS case for example, it would issue a qstat call, and get the PBS scheduler status of every jobs handled by the Dask cluster.

Not sure if this is really needed, as we are able with the use of Dask Client to know the real size of the cluster (and maybe by some other means).

Maybe this issue is just about documenting how to retrieve the information about a cluster state, either by job scheduler (eg. PBS) API, either using Dask API.

Pooling Docker test images

As dask-drmaa and dask-jobqueue are both making use of Docker images for testing against job schedulers, wondering if we can pool these Docker files somewhere. Also might be good to build these in advance and host them on Docker Hub or Quay so that we can just pull them down for testing on CIs instead of rebuilding them from scratch each time.

Why job_mem and job_cpu

If possible I'd like to reduce the number of parameters and naively these seem a bit redundant. However it looks like whoever implemented them set up sane defaults, so presumably there is some reasoning behind them. Under what situations are these important?

SLURMCluster - number of tasks requirement

Hi,
I've got a question/problem - what is the reason of hard-coding the number of tasks to 1?
This is problematic in some way for me because in my scenario I want to run multiple-node jobs and it is impossible with current generation of job_header.
Example:
I have a cluster of nodes with 24 CPUs per each and initialize my SLURMCluster like that

from dask.distributed import Client
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(name='dask_test',
                       queue='qwe123',
                       memory='1GB',
                       processes=48, # 2 times 24 since I want double-node job
                       threads=2, # combined with number of processes gives 96 cores
                       interface='ib0',
                       walltime='00:10:00',
                       job_cpu=1, # 1 CPU per each task
                       job_extra=['--nodes=2', '--ntasks-per-node=24']) 

Such declaration gives me following job header:

In [22]: cluster.job_header
Out[22]: '#SBATCH -J dask_test\n#SBATCH -e dask_test.err\n#SBATCH -o dask_test.out\n#SBATCH -p qwe123\n#SBATCH -n 1\n#SBATCH --cpus-per-task=1\n#SBATCH --mem=45G\n#SBATCH -t 00:10:00\n#SBATCH --nodes=2\n#SBATCH --ntasks-per-node=24'

And because of -n argument it won't create double-node jobs. So I've got to remove that from job header manually.

Maybe the number of tasks should be also an argument for cluster initialization and if it will be None the -n sbatch argument would not appear in job header? What I mean is a more flexible way of job_header creation.

Hello, world!

@jhamman @jedwards4b I've moved the job-queuing contents of the pangeo repository here. Some questions:

  1. Are you comfortable losing commit history. I've pointed to the origin commit in the first commit of this repo. I've also mentioned you both in the history section of the README (which would eventually go to a docpage?)
  2. How would you like to license this? I'm inclined to use the same license file used in Dask: https://github.com/dask/dask/blob/master/LICENSE.txt

@jakirkham (and anyone else actively using dask-drmaa) you should probably know about this parallel effort. I may start thinking about pulling a little bit of the shared logic between these systems into dask.distributed.

also cc @rabernat

Python 2: AttributeError: attribute '__doc__' of 'type' objects is not writable

Maybe related to #31 (comment).

conda create -n tmp27 dask distributed pytest docrep python=2 ipython -c conda-forge -y
source activate tmp27
ipython -c 'import dask_jobqueue'
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-1-8ca2c7c93177> in <module>()
----> 1 import dask_jobqueue

/home/lesteve/dev/dask-jobqueue/dask_jobqueue/__init__.py in <module>()
      1 # flake8: noqa
      2 from .core import JobQueueCluster
----> 3 from .pbs import PBSCluster
      4 from .slurm import SLURMCluster
      5 from .sge import SGECluster

/home/lesteve/dev/dask-jobqueue/dask_jobqueue/pbs.py in <module>()
      9 
     10 @docstrings.with_indent(4)
---> 11 class PBSCluster(JobQueueCluster):
     12     """ Launch Dask on a PBS cluster
     13 

/home/lesteve/miniconda3/envs/tmp27/lib/python2.7/site-packages/docrep/__init__.pyc in replace(func)
    374                 func = func.im_func
    375             func.__doc__ = func.__doc__ and self.with_indents(
--> 376                 func.__doc__, indent=indent, stacklevel=4)
    377             return func
    378         return replace

AttributeError: attribute '__doc__' of 'type' objects is not writable

I haven't looked into it in more details, I would expect a Python 2 related bug in docrep.

Unable to view the status dashboard

In today's experiments with Dask + dask-jobqueue, I found that I could not load the Bokeh dashboard that @mrocklin keeps showing me ๐Ÿ˜„, which has kept me wondering what exactly is the progress on my simple, embarrassingly parallel task of "loading ~900+ matlab .mat matrices into memory".

The URL provided by the client is: http://172.16.23.102:8787/status.

I'm able to ping the IP address in there:

$ ping 172.16.23.102

However, I'm unable to access the page in my browser; I get a timeout error.

In terms of network settings, I'm on my work VPN.

Is there something that's blocking access that I'm missing?

SLURM test failures

We're getting this test failure in the CI logs for a few unrelated PRs.

__________________________________ test_basic __________________________________
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x7fb0da9740f0>
    @pytest.mark.env("slurm")  # noqa: F811
    def test_basic(loop):
        with SLURMCluster(walltime='00:02:00', threads=2, processes=1, memory='4GB',
                          job_extra=['-D /'], loop=loop) as cluster:
            with Client(cluster) as client:
                workers = cluster.start_workers(2)
                future = client.submit(lambda x: x + 1, 10)
                assert future.result(60) == 11
                assert cluster.jobs
    
                info = client.scheduler_info()
                w = list(info['workers'].values())[0]
                assert w['memory_limit'] == 4e9
                assert w['ncores'] == 2
    
                cluster.stop_workers(workers)
    
                start = time()
                while len(client.scheduler_info()['workers']) > 0:
                    sleep(0.100)
>                   assert time() < start + 10
E                   assert 1529802632.3741777 < (1529802622.322609 + 10)
E                    +  where 1529802632.3741777 = time()
dask_jobqueue/tests/test_slurm.py:104: AssertionError
------------------------------ Captured log call -------------------------------
core.py                    210 ERROR    sbatch: error: Batch job submission failed: Unable to contact slurm controller (connect failure)
core.py                    210 ERROR    scancel: error: Invalid job id

flake8 ignore configuration

I was wondering whether there is a good motivation for the flake8 ignore settings and whether people would be open to use the flake8 defaults instead, or maybe a smaller set of ignores instead.

I am aware this is a bit of a bikeshedding issue, but I think that this is better to discuss this when the project is young than later down the line.

I ran flake8 with the default settings and there are eight flake8 violations which are super easy to fix (if you are curious the winner is E265 block comment should start with '# ' which to me is not very controversial).

I had a quick look and from what I can tell, the flake8 ignore settings of some projects in the dask organization projects are copied from the dask/dask one. I found this was introduced in dask/dask#1464 and it does not look like there is a strong motivated reason behind it. Some projects have drifted a bit off like dask-ml and dask-kubernetes.

PBS resources spec and Dask resources keywords link

I'm also wondering if we should have a tighter link between PBS resources spec and the dask processes/worker/memory options.

Basically, given select=x:ncpus=y:mem=z resource_spec from PBS:

  • x should always be one, as we launch one dask-worker per node,
  • y should be equal to dask nthreads * nprocesses, if I'm not mistaken,
  • z should be equal to dask nprocesses * memory-limit, again correct me if I'm wrong.

Should we implement this behaviour in the code? This would mean that we don't need the resource_spec keyword arg anymore, but we can just build it using dask worker options.

Release?

#86 changed behavior significantly. Should we release again? Should we wait for #63 ?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.