Git Product home page Git Product logo

dask-yarn's Issues

KeyError: 'ncores'

I was unable to copy and paste the stacktrace but here's a screenshot:

Screen Shot 2019-07-02 at 8 36 40 PM

Many issues on zeppelin

I am using aws emr with 1 master and 2 worker node (r5.xlarge). Hadoop, spark, and Zeppelin is all set up. I am using zeppelin for notebook processing. My bootstrap script for emr:

#!/usr/bin/env bash
set -x -e

#  move /usr/local to /mnt/usr-moved/local; else run out of space on /
sudo mkdir /mnt/usr-moved
sudo mv /usr/local /mnt/usr-moved/
sudo ln -s /mnt/usr-moved/local /usr/
sudo mv /usr/share /mnt/usr-moved/
sudo ln -s /mnt/usr-moved/share /usr/


#sudo bash -c 'yum update -y && yum upgrade && sudo yum groupinstall -y 'Development Tools''
sudo bash -c 'yum group install -y 'Development Tools''
sudo yum install -y htop

# Install anaconda
wget https://repo.continuum.io/archive/Anaconda3-5.3.0-Linux-x86_64.sh -O $HOME/anaconda.sh && /bin/bash ~/anaconda.sh -b -p $HOME/anaconda
echo -e '\nexport PATH=$HOME/anaconda/bin:$PATH' >> $HOME/.bashrc && source $HOME/.bashrc


conda config --set always_yes yes --set changeps1 no
conda config -f --add channels conda-forge
conda config -f --add channels defaults
conda update conda
conda update --all


# cleanup
rm ~/anaconda.sh

echo bootstrap_conda.sh completed. PATH now: $PATH
export PYSPARK_PYTHON="$HOME/anaconda/bin/python3.7"

# setup git
sudo yum install -y git

# install additional third party libraries
echo "Installing base packages"
conda install \
-c defaults \
-c conda-forge \
-c anaconda \
-y \
-q \
dask-yarn>=0.4.1 \
pyarrow \
s3fs \
nomkl \
conda-pack \
tensorflow \
grpcio \
protobuf \

yes | pip install fancyimpute
yes | pip install -U pandasql

if grep isMaster /mnt/var/lib/info/instance.json | grep true;
then
echo "Packaging environment"
conda pack -q -o $HOME/environment.tar.gz

echo "Configuring Dask"
mkdir -p $HOME/.config/dask
cat <<EOT >> $HOME/.config/dask/config.yaml
distributed:
  dashboard:
    link: "/proxy/{port}/status"
yarn:
  environment: /home/hadoop/environment.tar.gz
  worker:
    env:
      ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/
  client:
    env:
      ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/
EOT
# Also set ARROW_LIBHDFS_DIR in ~/.bashrc so it's set for the local user
echo -e '\nexport ARROW_LIBHDFS_DIR=/usr/lib/hadoop/lib/native' >> $HOME/.bashrc

fi

I have followed your aws emr guide to set up my environment however there are many problems:

First start with a question, I am using anaconda instead of miniconda, and it gives 2GiB conda env file. Is It okay?

  1. When I try to set up a cluster using local mode like:
import dask
import dask.distributed
from dask_yarn import YarnCluster
from dask.distributed import Client
from sklearn.externals import joblib

cluster = YarnCluster(deploy_mode='local')

it gives me:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-9-8bae7fd69e7b> in <module>
      1 # Create a cluster in local deploy mode, to have access to the dashboard
----> 2 cluster = YarnCluster(deploy_mode='local')
      3 # cluster = YarnCluster.from_specification('/home/hadoop/.config/dask/config.yaml')
      4 # cluster = YarnCluster(environment='/home/hadoop/environment.tar.gz', worker_vcores=4, worker_memory='4 GiB')
      5 

/home/hadoop/anaconda/lib/python3.7/site-packages/dask_yarn/core.py in __init__(self, environment, n_workers, worker_vcores, worker_memory, worker_restarts, worker_env, scheduler_vcores, scheduler_memory, deploy_mode, name, queue, tags, user, skein_client)
    291                                    queue=queue,
    292                                    tags=tags,
--> 293                                    user=user)
    294 
    295         self._start_cluster(spec, skein_client)

/home/hadoop/anaconda/lib/python3.7/site-packages/dask_yarn/core.py in _make_specification(**kwargs)
    116                "\n"
    117                "See http://yarn.dask.org/environments.html for more information.")
--> 118         raise ValueError(msg)
    119 
    120     n_workers = lookup(kwargs, 'n_workers', 'yarn.worker.count')

ValueError: You must provide a path to a Python environment for the workers.
This may be one of the following:
- A conda environment archived with conda-pack
- A virtual environment archived with venv-pack
- A path to a conda environment, specified as conda://...
- A path to a virtual environment, specified as venv://...
- A path to a python binary to use, specified as python://...

See http://yarn.dask.org/environments.html for more information.

After I tried to go with:

cluster = YarnCluster.from_specification('/home/hadoop/.config/dask/config.yaml')

gives me:

ValueError: Unknown extra keys for ApplicationSpec:
- distributed
- yarn

my config.yaml:

distributed:
  dashboard:
    link: "/proxy/{port}/status"
yarn:
  environment: /home/hadoop/environment.tar.gz
  worker:
    env:
      ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/
  client:
    env:
      ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/

when I do :

cluster = YarnCluster(environment='/home/hadoop/environment.tar.gz', worker_vcores=4, worker_memory='4 GiB')

It just waiting, and running but no ends.

Eventually, I can do:

cluster = YarnCluster(environment='/home/hadoop/environment.tar.gz')
cluster.scale(10)

It works okay. But I cannot configure worker memory, vcore etc.

  1. When I try to reach dask dashboard by printing:
print(cluster.dashboard_link)

It gives me None
3) Even if I continue with these problems, when I run grid search like:

with joblib.parallel_backend('dask'):
   # do grid search

When I connect worker nodes with ssh and check resource utilization, they are idle. Only the master node with 2 cores is executing.

hdfs configuration as step on EMR

First, thank you for this awesome library. It's been a game changer for me to be able to run dask clusters on EMR.

First, I'm using your EMR bootstrap script to set up the cluster. I have a bunch of large zip files (~2 GB) on s3 that I'm pulling smaller individual csv files out of and processing into a dask dataframe. I'm copying the data from s3 into hdfs before processing.

I can open a notebook as advertised and open a hdfs connection as follows with no problem.

import pyarrow as pa
fs = pa.hdfs.connect()

I'm able to do the same on the workers from within the notebook. So far this works great.

Where I run into a problem is that I want to run this as a script unattended as an EMR step instead of the notebook. However, when I do so it produces the following error on the same line

pyarrow.lib.ArrowIOError: HDFS connection failed

It seems like something is getting configured for the jupyter notebook server that isn't for the standalone job step, I'm just having a hard time pinning it down. Any ideas?

MapR non-standard Hadoop security not supported

Hi guys,

I am sorry if it's a dummy/repeated question. We have been trying to follow the example to bring up dask on yarn and keep getting error "Kerberos ticket not found, please kinit and restart" even though the user starting the cluster does have a valid ticket.

Is there anywhere where I could specifically point to the ticket location at the runtime of the cluster? We have a hadoop cluster and wanted to use dask on yarn. Wondering if anybody has tried to work with this constellation. MapR hadoop cluster/ Dask on Yarn and could give us any pointers would be highly appreciated.

Thank you!
Andre

Error attached.
log-daskyarn.txt

client.upload_file() fails unless we use time.sleep

Hi,

I am having issues with client.upload_file() when using a YarnCluster on a Spark EMR in AWS (one worker node). I've managed to find a workaround hack of using time.sleep() before creating a client from the cluster.

Without this sleep the upload_file() function appears to work but sys.path on the worker is not updated to include the zipfile I uploaded. This causes a ModuleNotFoundError exception in the function passed to client.submit().

I've created a minimal gist that shows this issue, at least on the EMR Spark cluster I'm using on AWS. I'm running this command on the master node using Python 3.6 (EMR 5.23.0): https://gist.github.com/crleblanc/ff4b7860a8a0401d283f7077c0d8953c. Setting WAIT_SECS to 0 causes this script to fail. Setting this to a large value like 10 seconds causes this example to pass as expected.

I have a bootstrap script that installs all of my dependencies but we're using upload_file() for code that changes often. This tends to change on every new instance of the cluster.

Hopefully this gist gives you an idea of where to investigate. Thanks for your work on such an excellent project!

"Stream is closed" error when creating dask client in Python 2

I'm trying to test dask-yarn installation on my Hadoop cluster. I started with simple example:

from dask_yarn import YarnCluster
from dask.distributed import Client

cluster = YarnCluster(environment='environment.tar.gz', worker_vcores=1, 
                      worker_memory='4GB', 
                      n_workers=4)

the YARN application starts successfully, I have 1 scheduler and 4 workers. One thing is I have the following error in the scheduler's container log

distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:  tcp://zzz:34495
distributed.scheduler - INFO -       bokeh at:                    :45834
distributed.scheduler - INFO - Register tcp://zzz:38718
distributed.scheduler - INFO - Starting worker compute stream, tcp://zzz:38718
distributed.core - INFO - Starting established connection
Future exception was never retrieved
future: <Future finished exception=KeyError('op',)>
Traceback (most recent call last):
  File "/data/hadoop/yarn/local/usercache/user/appcache/application_1537451980488_0012/container_e19_1537451980488_0012_01_000002/environment/lib/python3.6/site-packages/tornado/gen.py", line 1147, in run
    yielded = self.gen.send(value)
  File "/data/hadoop/yarn/local/usercache/user/appcache/application_1537451980488_0012/container_e19_1537451980488_0012_01_000002/environment/lib/python3.6/site-packages/distributed/core.py", line 313, in handle_comm
    op = msg.pop('op')
KeyError: 'op'

(zzz is just scheduler's host IP); and the output log in the python shell:

18/10/18 12:08:18 INFO skein.Daemon: Submitting application...
18/10/18 12:08:18 INFO impl.YarnClientImpl: Submitted application application_1537451980488_0012
18/10/18 12:08:36 INFO skein.Daemon: Starting process disconnected, shutting down
18/10/18 12:08:36 INFO skein.Daemon: Daemon shut down

Once I'm getting to the dask client:

client = Client(cluster)

I receive the stream closed error

/home/user/anaconda2/lib/python2.7/site-packages/distributed/comm/tcp.pyc in convert_stream_closed_error(obj, exc)
    124         raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
    125     else:
--> 126         raise CommClosedError("in %s: %s" % (obj, exc))
    127 
    128 

CommClosedError: in <closed TCP>: Stream is closed

Interestingly, that <Future finished exception=KeyError('op',)> appears again in the scheduler log every time I'm trying to create the client.

The environment:

  • Hadoop 2.8.1
  • Anaconda environment (pip freeaze):
asn1crypto==0.24.0
bokeh==0.13.0
certifi==2018.10.15
cffi==1.11.5
chardet==3.0.4
Click==7.0
cloudpickle==0.6.1
conda-pack==0.2.0
cryptography==2.3.1
cryptography-vectors==2.3.1
cytoolz==0.9.0.1
dask==0.19.4
dask-yarn==0.3.1
distributed==1.23.3
future==0.16.0
grpcio==1.14.1
heapdict==1.0.0
idna==2.7
Jinja2==2.10
locket==0.2.0
MarkupSafe==1.0
mkl-fft==1.0.6
mkl-random==1.0.1
msgpack==0.5.6
numpy==1.15.0
packaging==18.0
pandas==0.23.4
partd==0.3.9
patsy==0.5.0
protobuf==3.6.1
psutil==5.4.7
pyarrow==0.11.0
pycparser==2.19
pyOpenSSL==18.0.0
pyparsing==2.2.2
PySocks==1.6.8
python-dateutil==2.7.3
pytz==2018.5
PyYAML==3.13
requests==2.19.1
scikit-learn==0.19.1
scipy==1.1.0
six==1.11.0
skein==0.2.0
sortedcontainers==2.0.5
statsmodels==0.9.0
tblib==1.3.2
toolz==0.9.0
tornado==5.1.1
tqdm==4.26.0
tsfresh==0.11.0
urllib3==1.23
zict==0.1.3

Jupyter notebook widget can't scale cluster

I've got a standard deployment of Dask 2.0 on AWS EMR Cluster with original bootstrap shell script.
I can connect to Jupyter Notebook and run the example code:

from dask_yarn import YarnCluster
from dask.distributed import Client

cluster = YarnCluster(environment='environment.tar.gz',
                      worker_vcores=2,
                      worker_memory="3GiB",
                      deploy_mode='local')
client = Client(cluster)
cluster

But when trying to scale up the cluster through widget I get an error:

tornado.application - ERROR - Exception in callback <function YarnCluster._widget..update at 0x7fc78597c378>
Traceback (most recent call last):
File "/home/hadoop/miniconda/lib/python3.6/site-packages/tornado/ioloop.py", line 907, in _run
return self.callback()
File "/home/hadoop/miniconda/lib/python3.6/site-packages/dask_yarn/core.py", line 649, in update
status.value = self._widget_status()
File "/home/hadoop/miniconda/lib/python3.6/site-packages/dask_yarn/core.py", line 585, in _widget_status
cores = sum(w['ncores'] for w in workers.values())
> File "/home/hadoop/miniconda/lib/python3.6/site-packages/dask_yarn/core.py", line 585, in
cores = sum(w['ncores'] for w in workers.values())
KeyError: 'ncores'

App log contains the following warning:

/mnt/yarn/usercache/hadoop/appcache/application_.../container_.../environment/lib/python3.6/site-packages/distributed/nanny.py:102: UserWarning: the ncores= parameter has moved to nthreads=

The problem disappeared after I changed the line 585 to:
cores = sum(w['nthreads'] for w in workers.values()) if workers.values() else 0

Environment details:

dask 2.0.0
dask-core 2.0.0
dask-yarn 0.6.1
notebook 5.7.8
tornado 6.0.3
distributed 2.0.1
bokeh 1.2.0
ipykernel 5.1.1
ipython 7.6.0
ipython_genutils 0.2.0
ipywidgets 7.4.2

Dask Yarn - FileNotFoundError: Failed to resolve .skein.{crt,pem} in 'LOCAL_DIRS'

I am trying to run my application using dask-yarn but while creating the yarn cluster I am getting an error-

Traceback (most recent call last):
File "", line 1, in
File "/home/user/project/sync/lib/python3.5/site-packages/dask_yarn/core.py", line 291, in init
self._start_cluster(spec, skein_client)
File "/home/user/project/sync/lib/python3.5/site-packages/dask_yarn/core.py", line 367, in _start_cluster
scheduler_address = app.kv.wait('dask.scheduler').decode()
File "/home/user/project/sync/lib/python3.5/site-packages/skein/kv.py", line 655, in wait
event = event_queue.get()
File "/home/user/project/sync/lib/python3.5/site-packages/skein/kv.py", line 281, in get
raise out
skein.exceptions.ConnectionError: Unable to connect to application

On checking the yarn logs I found that some of the worker nodes are throwing FileNotFoundError for skein.{crt,pem}-

LogContents:
/home/user/project/sync/lib/python3.5/site-packages/dask_yarn/config.py:13: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.
defaults = yaml.load(f)
Traceback (most recent call last):
File "/home/user/project/sync/bin/dask-yarn", line 10, in
sys.exit(main())
File "/home/user/projectn/sync/lib/python3.5/site-packages/dask_yarn/cli.py", line 407, in main
func(**kwargs)
File "/home/user/project/sync/lib/python3.5/site-packages/dask_yarn/cli.py", line 288, in scheduler
app_client = skein.ApplicationClient.from_current()
File "/home/user/project/sync/lib/python3.5/site-packages/skein/core.py", line 792, in from_current
security=Security.from_default())
File "/home/user/project/sync/lib/python3.5/site-packages/skein/model.py", line 344, in from_default
"Failed to resolve .skein.{crt,pem} in 'LOCAL_DIRS'")
FileNotFoundError: Failed to resolve .skein.{crt,pem} in 'LOCAL_DIRS'

Also sometimes my dask job works just fine and sometimes it throws this error. Is this a known issue or am I doing something wrong.

CLI commands are broken for Python < 2.7.6

Hello,

  • I have a Python virtual environment (with dask installed) on edge node of a hadoop cluster
  • I created an archive of the pyspark2.tar.gz environment in the /home/chaudro directory on the local machine.

By running the following code:

from dask_yarn import YarnCluster
from dask.distributed import Client

cluster = YarnCluster(environment='/home/rchaudro/pyspark2.tar.gz')

I have the following error:

---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-2-16f7e6a860db> in <module>()
      2 from dask.distributed import Client
      3 
----> 4 cluster = YarnCluster(environment='pyspark2.tar.gz')
      5 # Connect to the cluster
      6 client = Client(cluster)

/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/dask_yarn/core.pyc in __init__(self, environment, n_workers, worker_vcores, worker_memory, worker_restarts, worker_env, scheduler_vcores, scheduler_memory, deploy_mode, name, queue, tags, user, skein_client)
    293                                    user=user)
    294 
--> 295         self._start_cluster(spec, skein_client)
    296 
    297     @cached_property

/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/dask_yarn/core.pyc in _start_cluster(self, spec, skein_client)
    337                              "'dask.worker' service")
    338 
--> 339         skein_client = _get_skein_client(skein_client)
    340 
    341         if 'dask.scheduler' not in spec.services:

/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/dask_yarn/core.pyc in _get_skein_client(skein_client, security)
     44         with warnings.catch_warnings():
     45             warnings.simplefilter('ignore')
---> 46             return skein.Client(security=security)
     47     return skein_client
     48 

/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/skein/core.pyc in __init__(self, address, security, keytab, principal, log, log_level, java_options)
    351                                           log=log,
    352                                           log_level=log_level,
--> 353                                           java_options=java_options)
    354         else:
    355             proc = None

/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/skein/core.pyc in _start_driver(security, set_global, keytab, principal, log, log_level, java_options)
    250                                 stderr=outfil,
    251                                 env=env,
--> 252                                 **popen_kwargs)
    253 
    254         while proc.poll() is None:

/usr/lib64/python2.7/subprocess.pyc in __init__(self, args, bufsize, executable, stdin, stdout, stderr, preexec_fn, close_fds, shell, cwd, env, universal_newlines, startupinfo, creationflags)
    709                                 p2cread, p2cwrite,
    710                                 c2pread, c2pwrite,
--> 711                                 errread, errwrite)
    712         except Exception:
    713             # Preserve original exception in case os.close raises.

/usr/lib64/python2.7/subprocess.pyc in _execute_child(self, args, executable, preexec_fn, close_fds, cwd, env, universal_newlines, startupinfo, creationflags, shell, to_close, p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite)
   1325                         raise
   1326                 child_exception = pickle.loads(data)
-> 1327                 raise child_exception
   1328 
   1329 

OSError: [Errno 2] No such file or directory

"Module not found" error using groupby-apply in dask-yarn on EMR

I'm having problems getting dask groupby-apply working on EMR, I am seeing error messages in the worker logs which indicate a problem with deserialization the function being applied. I have a reproducable example that I have simplified as far as possible, which I am running on a small cluster (master: m4.large, workers: 2x m4.large)

  1. I create the cluster using a modified version of the dask-yarn example bootstrap script. The script installs miniconda and other dependencies on the master node and all work nodes. It also creates a python source file (dask_test.py) containing a simple function on all nodes, and adds the file to the PYTHONPATH. This is my bootstrap script: bootstrap-dask-test.txt (I have changed the extension so I can upload it)

  2. Running the following code using a LocalCluster in a notebook on the master works as expected:

import dask
from distributed import Client
from dask_test import test_func 

client = Client(n_workers=2, threads_per_worker=1, memory_limit='3 GiB')

df = dask.datasets.timeseries().groupby(['name']).apply(test_func, meta={'elements': 'int'}).compute()

print(f'Result has {df.shape[0]} rows and {df.shape[1]} columns')
  1. The following code in a notebook using a YarnCluster fails:
import dask
from distributed import Client
from dask_yarn import YarnCluster
from dask_test import test_func 

env='python:///home/hadoop/miniconda/bin/python3.6'
cluster = YarnCluster(environment=env, deploy_mode='local', n_workers=4, worker_vcores=1, worker_memory='3 GiB')
client = Client(cluster)

df = dask.datasets.timeseries().groupby(['name']).apply(test_func, meta={'elements': 'int'}).compute()

print(f'Result has {df.shape[0]} rows and {df.shape[1]} columns')

This is the worker log containing the serialization error message:
dask.worker.log

I have also tried using a packaged conda environment, and using Client.upload_file, with no luck. Any advice would be greatly appreciated.

AWS EMR yarn-cluster seems to ignore worker_vcore param

Sorry for the bare bones issue, I'd be happy to flesh out more but I'm not sure where to start providing data. I also suspect this might be a skein bug, but I'm not sure - happy to point it in their direction if that is the case

When creating a YarnCluster and specifying worker_vcores, the scheduler starts up fine, but no workers instantiate (either via a scale or by specifying initial n_workers). The application logs look like there's a mismatch between what containers are being requested and what skein wants, which points to a problem on there end, but I'm not too sure and figured I'd start here. Any ideas for additional things to look for?

Specifying worker_memory works fine, but will always end up having only 1 core per container regardless of how much memory you give it, so you end up with situations where there's 10 containers with 100gb of ram each only using 1 core. Pains me inside :)

Thanks,
Michael

19/01/15 19:04:25 INFO skein.ApplicationMaster: Application specification successfully loaded
19/01/15 19:04:25 INFO skein.ApplicationMaster: Running as user hadoop
19/01/15 19:04:25 INFO client.RMProxy: Connecting to ResourceManager at ip-10-201-102-148.ginkgobioworks.com/10.201.102.148:8030
19/01/15 19:04:25 INFO impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
19/01/15 19:04:25 INFO skein.ApplicationMaster: gRPC server started at ip-10-201-102-157.ginkgobioworks.com:42873
19/01/15 19:04:25 INFO skein.ApplicationMaster: WebUI server started at ip-10-201-102-157.ginkgobioworks.com:32841
19/01/15 19:04:25 INFO client.RMProxy: Connecting to ResourceManager at ip-10-201-102-148.ginkgobioworks.com/10.201.102.148:8032
19/01/15 19:04:25 INFO skein.ApplicationMaster: Initializing service 'dask.worker'.
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_2
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_3
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_4
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_5
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_6
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_7
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_8
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_9
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_10
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_11
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_12
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_13
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_14
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_15
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_16
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_17
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_18
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_19
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_20
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_21
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_22
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_23
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_24
19/01/15 19:04:27 INFO impl.AMRMClientImpl: Received new token for : ip-10-201-102-157.ginkgobioworks.com:8041
19/01/15 19:04:27 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 1, releasing container_1547573448325_0009_01_000002
19/01/15 19:04:28 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 2, releasing container_1547573448325_0009_01_000003
19/01/15 19:04:29 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 3, releasing container_1547573448325_0009_01_000004
19/01/15 19:04:30 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 4, releasing container_1547573448325_0009_01_000005
19/01/15 19:04:31 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 5, releasing container_1547573448325_0009_01_000006
19/01/15 19:04:32 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 6, releasing container_1547573448325_0009_01_000007
19/01/15 19:04:33 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 7, releasing container_1547573448325_0009_01_000008
19/01/15 19:04:34 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 8, releasing container_1547573448325_0009_01_000009
19/01/15 19:04:35 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 9, releasing container_1547573448325_0009_01_000010
19/01/15 19:04:36 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 10, releasing container_1547573448325_0009_01_000011
19/01/15 19:04:37 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 11, releasing container_1547573448325_0009_01_000012
19/01/15 19:04:38 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 12, releasing container_1547573448325_0009_01_000013
19/01/15 19:04:39 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 13, releasing container_1547573448325_0009_01_000014
19/01/15 19:04:40 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 14, releasing container_1547573448325_0009_01_000015
19/01/15 19:04:41 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 15, releasing container_1547573448325_0009_01_000016
19/01/15 19:04:42 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 16, releasing container_1547573448325_0009_01_000017
19/01/15 19:04:43 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 17, releasing container_1547573448325_0009_01_000018
19/01/15 19:04:44 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 18, releasing container_1547573448325_0009_01_000019
19/01/15 19:04:45 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 19, releasing container_1547573448325_0009_01_000020
19/01/15 19:04:46 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 20, releasing container_1547573448325_0009_01_000021
19/01/15 19:04:47 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 21, releasing container_1547573448325_0009_01_000022
19/01/15 19:04:48 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 22, releasing container_1547573448325_0009_01_000023
19/01/15 19:04:49 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 23, releasing container_1547573448325_0009_01_000024
19/01/15 19:04:50 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 24, releasing container_1547573448325_0009_01_000025
19/01/15 19:04:51 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 25, releasing container_1547573448325_0009_01_000026

Schedulers and workers not using personal config

As far as I can tell the Dask workers and scheduler on Yarn nodes only look at /etc/dask/ for configuration. It would be nice if also files in ~/.config/dask/ are used. A better option would be to allow uploading the local Dask configuration to HDFS so no Dask config is needed on the Yarn cluster nodes.

I am trying to achieve user separation and that seems only possible by specifying a different CA for each user. I have created a feature request this purpose dask/distributed#2347

Specify scheduler node

Is it possible to specify which node should be used for the yarn-scheduler? That would be really handy to integrate it into Jupyter using the nbproxy, since I'm not able to reach the port directly

Issue accessing AWS EMR Dask cluster Bokeh dashboard via Chrome

I am trying to access the Dask Bokeh dashboard on an AWS EMR cluster via Chrome, but nothing is shown when I click on the linked dashboard.

I set-up the cluster using this workflow. When I click on the linked dashboard, I see nothing unless I click on "Info" where I can only find information on the workers.

I have tried using ! pip install tornado==5 from within Jupyter Notebook/Hub with no resolution.

What am I missing to see the entire Dask Bokeh dashboard?

Note: This is a requested x-post from StackOverflow 1

Allow forwarding CLI options

Some options on the distributed CLI should be settable via dask-yarn (e.g. --dashboard-address). There are a few ways we could accomplish this:

  • Update distributed so that all parameters are also part of the configuration. Users could then provide them via env (or via configuration #42).
  • Add an option to specify these parameters as part of the command

From jcrist/skein#176.

Add `YarnCluster.from_application_id`

Would support creating a cluster object from an existing application. The process would be:

  • Connect to the application using skein.Client.connect
  • Check if the application contains the service dask.worker (application is indeed a dask cluster)
  • Create a new YarnCluster object using the existing connection logic

Yarn Cluster

Hi everyone, I'm trying to execute a small piece of code on Yarn, there is my whole code:

`


with YarnCluster(environment='environment.tar.gz',
                      worker_vcores=2,
                      worker_memory="8GiB") as cluster:

                cluster.scale(2)
                # Connect to the cluster
                client = Client(cluster)

                CSV_INPUT = 'hdfs:///Data/input/input.csv'
                CSV_OUTPUT = 'hdfs://Data/input/output.parquet'

                df = dd.read_csv(CSV_INPUT)
                dd.to_parquet(df, CSV_OUTPUT)

`

unfortunely i have this error

`

distributed.scheduler.KilledWorker: ("('pandas_read_text-read-block-from-delayed-3f488119df76d5b2ba0e2e75ec2bc55b', 0)", <Worker 'tcp://myip:40417', memory: 0, processing: 1>)

`
the error is not understandable for me, have any one experienced this issue?
thanks

dask-yarn does not work with joblib

Hey,

I tried to make dask-yarn work with joblib's parallel loops, but failed because there seems to be a problem with the YarnCluster reporting its cores.

Here a small example:

cluster = YarnCluster(...)

# Connect to the cluster
client = Client(cluster)

# This should give a list of available cores, but is empty
print(client.ncores())

with parallel_backend('dask'):
    # This is empty since no workers are assigned
    results = Parallel(verbose=100)(delayed(lambda x: x**2)(x) for x in range(10))

However, I found that doesn't seem to be a problem when using a map-gather construct to achieve the same.

# This works fine
results = client.gather(client.map(lambda x: x**2, range(10)))

Add option to distribute files to workers

From #49 (comment).

I think this would look like:

  • A worker_files kwarg, which takes a mapping to forward to the files kwarg in the skein specification.
  • A worker-file and worker-archive option in the cli. We could mirror spark's api here and have the localized name be optionally specified with a # separator (see https://spark.apache.org/docs/latest/running-on-yarn.html#important-notes). I'm not a huge fan of this api, but it is used by other tools and that might be argument enough to use it.
dask-yarn submit \
  --worker-file /local/file/path \
  --worker-file /other/local/file/path#name-in-container

worker_restarts does not get passed properly in core.py from __init__ to _make_specification

spec = _make_specification(environment=environment,
n_workers=n_workers,
worker_vcores=worker_vcores,
worker_memory=worker_memory,
worker_max_restarts=worker_max_restarts,
scheduler_vcores=scheduler_vcores,
scheduler_memory=scheduler_memory,
name=name,
queue=queue,
tags=tags)

Yet _make_specification is looking for

worker_restarts = either('worker_restarts', 'yarn.worker.restarts')

Understanding how to configure Skein and Dask-Yarn together in YAML

I am trying to sort out how the Skin YAML configuration (described here https://jcrist.github.io/skein/specification.html) matches with the Dask-Yarn YAML configuration (described here http://yarn.dask.org/en/latest/configuration.html).

Specifically I see the Skin has a files configuration to let you distribute files to the workers. It says to place this files section under the master or service section of the configuration. For the dask-yarn configuration there's only scheduler and worker sections shown in the example. Are the dask-yarn scheduler and worker being implicitly placed into a services section of a Skein configuration?

How does the dask-yarn YAML file match up to the Skein YAML file and where would I put the file attributes in my dask-yarn config file?
Thanks!

Installation issues

Not sure this is dask-yarn's fault so feel free to close, but wanted to share this fail when trying installation:

$ conda install dask-yarn -c conda-forge
 PackageNotFoundError: Package not found: '' Dependencies missing in current linux-64 channels:
   - dask-yarn -> grpcio >=1.14.0 -> c-ares >=1.15.0,<2.0a0 -> libgcc-ng >=7.3.0  
   - dask-yarn -> grpcio >=1.14.0 -> libstdcxx-ng >=7.3.0      

Add a CLI for submitting batch jobs

This would probably look like

$ dask-yarn submit myscript.py [options...]

It would submit a YARN job with an additional service running the user's script. This would allow dask-yarn to submit scripts to run fully on the cluster, mirroring spark-submit in cluster mode. The command could either exit immediately (leaving tracking of application success/failure to other tools), or log to the console.

dask-yarn not requesting the exact number of workers i specified

I'm running the following, very simple example, with dask-yarn.

cluster = YarnCluster(environment='venv:///home/florisvannee/my_envsd',
			worker_vcores=1,
			worker_memory="256MiB")
cluster.scale(1)
client = Client(cluster)

def inc(x):
    return x + 1

a = client.submit(inc, 20)
print(a)
print(a.result())

This never completes. If I change the call to scale with any number higher than 1, it does complete. Eg. with cluster.scale(2) I get the following output in the log:

19/04/19 14:07:05 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0
19/04/19 14:07:06 INFO skein.ApplicationMaster: Starting container_1555675382310_0003_01_000002...
19/04/19 14:07:06 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_1555675382310_0003_01_000002
19/04/19 14:07:07 INFO skein.ApplicationMaster: Scaling service 'dask.worker' to 2 instances, a delta of 2.
19/04/19 14:07:07 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0
19/04/19 14:07:07 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1
19/04/19 14:07:12 INFO skein.ApplicationMaster: Starting container_1555675382310_0003_01_000003...
19/04/19 14:07:12 INFO skein.ApplicationMaster: RUNNING: dask.worker_1 on container_1555675382310_0003_01_000003

Even though it looks like it is requesting worker_0, it never gets it. When I look in the overall resourcemanager log, I never see any request for worker_0 though. I only see requests for worker 1. Note that this is not, because there is in this example no work (for more complex examples with more load I see exactly the same behavior, the first worker never gets properly requested). Also, when running with cluster.scale(1), it never completes because that first worker is just never started up.
Running scale() with any number higher than 2 always results in all workers being started except worker_0.

Any ideas what could be causing this?

Question: Could this be used to get a dask cluster running on AWS EMR?

Potentially naive question, as I just learned what YARN was at a meetup last night. I think Amazon's EMR service is built around it. With that in mind, could you use this package, or parts of it, to get a dask cluster up and running on EMR?

I know the recommended deployment is using kubernetes, but my company blocked AWS' kubernetes service (EKS) 🤦‍♂️.

Any tips/advice would be greatly appreciated.

dask-yarn ssl error

Hello this is a copy from: jcrist/skein#181

I am new to skein and dask in general. I am trying to create a hello world dask-yarn application and running into ssl certificate errors. Does anyone know how to get around it?

Here's the code i am using:

 import skein
 c = skein.Client(log_level='debug', java_options="-Djava.io.tmpdir=/path/to/some/dir -Dcom.sun.net.ssl.checkRevocation=false")
 

and here's the error message:

19/07/08 09:25:06 DEBUG skein.Driver: Starting Skein version 0.7.4
19/07/08 09:25:06 DEBUG skein.Driver: Logging in using ticket cache
19/07/08 09:25:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/07/08 09:25:07 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
19/07/08 09:25:08 INFO client.AHSProxy: Connecting to Application History server at cap-compute02/172.27.57.54:10200
19/07/08 09:25:08 INFO skein.Driver: Driver started, listening on 37170
19/07/08 09:25:08 DEBUG skein.Driver: Reporting gRPC server port back to the launching process
E0708 09:25:08.618234364  161031 ssl_transport_security.cc:1238] Handshake failed with fatal error SSL_ERROR_SSL: error:1000007d:SSL routines:OPENSSL_internal:CERTIFICATE_VERIFY_FAILED.
19/07/08 09:25:08 DEBUG skein.Driver: Starting process disconnected, shutting down
19/07/08 09:25:08 DEBUG skein.Driver: Driver shut down
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/software/virtual_env/aesop_20190707_env/lib/python3.7/site-packages/skein/core.py", line 366, in __init__
    self._call('ping', proto.Empty())
  File "/software/virtual_env/aesop_20190707_env/lib/python3.7/site-packages/skein/core.py", line 289, in _call
    raise ConnectionError("Unable to connect to %s" % self._server_name)
skein.exceptions.ConnectionError: Unable to connect to driver
 

Unable to connect to application

Hi guys,

i am facing this error. What am i missing?

ConnectionError: Unable to connect to application
ConnectionError Traceback (most recent call last)
in engine
----> 1 mycluster=YarnCluster(environment='compre3.tar.gz')

/home/cdsw/.local/lib/python3.6/site-packages/dask_yarn/core.py in init(self, environment, n_workers, worker_vcores, worker_memory, worker_restarts, worker_env, scheduler_vcores, scheduler_memory, deploy_mode, name, queue, tags, user, skein_client)
293 user=user)
294
--> 295 self._start_cluster(spec, skein_client)
296
297 @cached_property

/home/cdsw/.local/lib/python3.6/site-packages/dask_yarn/core.py in _start_cluster(self, spec, skein_client)
373 app = skein_client.submit_and_connect(spec)
374 try:
--> 375 scheduler_address = app.kv.wait('dask.scheduler').decode()
376 dashboard_address = app.kv.get('dask.dashboard')
377 if dashboard_address is not None:

/home/cdsw/.local/lib/python3.6/site-packages/skein/kv.py in wait(self, key, return_owner)
653 return res
654
--> 655 event = event_queue.get()
656
657 return event.result if return_owner else event.result.value

/home/cdsw/.local/lib/python3.6/site-packages/skein/kv.py in get(self, block, timeout)
279 if isinstance(out, Exception):
280 self._exception = out
--> 281 raise out
282 return out
283

ConnectionError: Unable to connect to application

Modify the dask dashboard to work through the yarn proxy

The dask dashboard relies on modern web features like websockets, which are incompatible with the yarn proxy. To get around this we can either:

  • Modify the dashboard to optionally use ajax polling, as seen in this example
  • Create a new, simpler dashboard to use with yarn deployments. The dashboard is just a scheduler plugin, so it wouldn't necessarily have to live in the distributed repo (although it might).

Configuration file

How do we feel about the following configuration file:

yarn:
  skein-specification: null  # A full specification or path to a specification
                             # Overwrites the following configuraiton if given

  name: dask                 # Application name
  queue: default             # Yarn queue to deploy to
  environment: null          # Path to conda packed environment
  tags: []                   # List of strings to tag applications
  scheduler:                 # Specifications of scheduler container
    cores: 1
    memory: 2GiB
  worker:                    # Specifications of worker containers
    cores: 1
    memory: 2GiB
    instances: 0
    restarts: -1             # Allowed number of restarts, -1 for unlimited

A couple open questions:

  1. Should we adhere to Dask or Yarn conventions for names and memory like cores vs vcores and 2GiB vs 2048
  2. What should we include or not include here? For example should we also includes files and such? At what point are we recreating the skein spec?

Adaptivity

So how do we handle adaptive loads? There are a few actors here living in different places here:

  1. The dask client, which infrequently changes some parameters to the adaptivity (like maximum number of workers)
  2. The skein client, which somewhat frequently needs to add or destroy workers
  3. The scheduler which changes state very frequently
  4. The adaptive object, which looks at some of the scheduler state and decides to ask the skein client (or other object satisfying the Cluster interface) to scale up or down appropriately.

So it's convenient if the Adaptive and Scheduler objects are in the same thread. The Dask Client and the Scheduler are likely on different machines. Should the Skein Client be with the Dask Client or the Scheduler?

Skein Client with Dask Client

If the Skein Cleint should be with the Dask Client then we should probably make a RemoteCluster object that lives with the Scheduler, but just streams back signals to us.

class RemoteCluster(Cluster):
    def __init__(self, scheduler, comm):
        self.scheduler = scheduler
        self.comm = comm
    def scale_up(self, n):
        self.comm.send({'op': 'scale-up', 'n': n})
    def scale_down(self, workers):
        self.comm.send({'op': 'scale-down', 'workers': workers})

We would then, from the Dask client ask the scheduler to set up an RemoteCluster, set up a local Adaptive object that uses it, and have that pair send us the information to pass onto the Skein Client.

Skein Client with Scheduler

Alternativley we could add the Skein Client to the process running the Scheduler, empowering it to start and stop its own workers. It would start up its own YarnCluster object, presumably providing it its own application id. It would then receive infrequent messages from the client to change its adaptive settings (this infrastructure is already built).

This requires less work on our part, but does mean that a Yarn container gets elevated access about its own application. I'm not sure if this is common or not.

broken compatibility with distributed 2.2.0

dask-yarn doesn't seem to be compatible with distributed 2.2.0.
The reason seems to be this change:

which then breaks this piece of code here:
https://github.com/dask/dask-yarn/blob/master/dask_yarn/cli.py#L316

so that whenever scheduler is attempted to be started it crashes with:

TypeError: start() takes exactly 1 positional argument (2 given)

Sorry cannot give full call stack right now, but can provide, if needed. Anyway, the problem is clear I think.

Dask-yarn container not getting environment variable configured in yarn.yaml

I have this minimal yarn.yaml file. I'm using dask-yarn 0.5.0 and Skein 0.5.1. I'm running on an EMR cluster.

yarn:
  name: dask # Application name
  queue: default             # Yarn queue to deploy to

  environment: python:///usr/bin/python

  tags: []                   # List of strings to tag applications

  specification:
    name: dask
    queue: default
    services:
      dask.scheduler:
        resources:
          memory: 4GiB
          vcores: 1
        script: |
          # Script to start the scheduler. Alternatively dask-yarn scheduler might work if it's on your PATH
          dask-yarn services scheduler
      dask.worker:
        instances: 1
        resources:
          memory: 8GiB
          vcores: 1
        env:
          'AAAAAAA': aaaaaaaaaaaaaaaa
        script: |
          # Script to start the scheduler. Alternatively dask-yarn scheduler might work if it's on your PATH
          dask-yarn services worker

I am trying to set environment variables in my containers. I have been able to successfully do this using the YarnCluster(worker_env=WORKER_ENV) syntax, however I want to be able to use a yarn.yaml files so I can change the cluster configuration during testing without having to change my code, and including any kwargs in the constructor prevents this file being used. I'm running this basic program and finding that the environment variables are not being set as I expect.

import os
from dask_yarn import YarnCluster
from dask.distributed import Client

def get_env(varname):
    import os
    return os.environ.get(varname)

cluster = YarnCluster()
client = Client(cluster)
a = client.submit(get_env, 'AAAAAAA')
print(a.result())

This returns None since the variable isn't being set.

Trouble reading with KMS

The Hadoop cluster I use just added additional security with two KMS servers to encrypt the data. From the logs it looks like the KMS is unhappy with my kerberos credentials. I do have a valid kerberos ticket (the job would not have launched otherwise but I double checked).

I am getting a new error while reading from parquet:

19/03/01 12:16:45 INFO skein.Driver: Driver started, listening on 46336
19/03/01 12:16:46 INFO hdfs.DFSClient: Created token for jlord: HDFS_DELEGATION_TOKEN [email protected], renewer=yarn, realUser=, issueDate=1551464206344, maxDate=1552069006344, sequenceNumber=9382869, masterKeyId=510 on ha-hdfs:nameservice1
19/03/01 12:16:47 INFO security.TokenCache: Got dt for hdfs://nameservice1; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:nameservice1, Ident: (token for jlord: HDFS_DELEGATION_TOKEN [email protected], renewer=yarn, realUser=, issueDate=1551464206344, maxDate=1552069006344, sequenceNumber=9382869, masterKeyId=510)
19/03/01 12:16:47 INFO security.TokenCache: Got dt for hdfs://nameservice1; Kind: kms-dt, Service: 10.195.102.124:16000, Ident: (kms-dt owner=jlord, renewer=yarn, realUser=, issueDate=1551464207669, maxDate=1552069007669, sequenceNumber=115244, masterKeyId=35)
19/03/01 12:16:47 INFO skein.Driver: Uploading application resources to hdfs://nameservice1/user/jlord/.skein/application_1551067831607_22507
19/03/01 12:17:03 INFO skein.Driver: Submitting application...
19/03/01 12:17:03 INFO impl.YarnClientImpl: Submitted application application_1551067831607_22507
19/03/01 12:17:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Traceback (most recent call last):
  File "read_parquet.py", line 27, in <module>
    df = dataframe.read_parquet('hdfs:///user/jlord/test_lots_of_parquet')
  File "/nas/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask/dataframe/io/parquet.py", line 1155, in read_parquet
    categories=categories, index=index, infer_divisions=infer_divisions)
  File "/nas/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask/dataframe/io/parquet.py", line 674, in _read_pyarrow
    filters=filters)
  File "/nas/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/pyarrow/parquet.py", line 890, in __init__
    path_or_paths, self.fs, metadata_nthreads=metadata_nthreads)
  File "/nas/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/pyarrow/parquet.py", line 1065, in _make_manifest
    .format(path))
OSError: Passed non-file path: /user/jlord/test_lots_of_parquet
19/03/01 12:17:54 INFO skein.Driver: Driver shut down

The logs:

Container: container_e10_1551067831607_22507_01_000001 on all1.allstate.com_8041
=====================================================================================
LogType:application.master.log
Log Upload Time:Fri Mar 01 12:17:55 -0600 2019
LogLength:3334
Log Contents:
19/03/01 12:17:09 INFO skein.ApplicationMaster: Running as user jlord
19/03/01 12:17:09 INFO skein.ApplicationMaster: Application specification successfully loaded
19/03/01 12:17:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/03/01 12:17:11 INFO skein.ApplicationMaster: gRPC server started at all1.allstate.com:41971
19/03/01 12:17:11 INFO skein.ApplicationMaster: WebUI server started at all1.allstate.com:34551
19/03/01 12:17:11 INFO skein.ApplicationMaster: Registering application with resource manager
19/03/01 12:17:12 INFO skein.ApplicationMaster: Initializing service 'dask.worker'.
19/03/01 12:17:12 INFO skein.ApplicationMaster: Initializing service 'dask.scheduler'.
19/03/01 12:17:12 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0
19/03/01 12:17:13 INFO skein.ApplicationMaster: Starting container_e10_1551067831607_22507_01_000002...
19/03/01 12:17:13 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_e10_1551067831607_22507_01_000002
19/03/01 12:17:52 INFO skein.ApplicationMaster: Scaling service 'dask.worker' to 10 instances, a delta of 10.
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_2
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_3
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_4
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_5
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_6
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_7
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_8
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_9
19/03/01 12:17:54 INFO skein.ApplicationMaster: Shutting down: Shutdown requested by user.
19/03/01 12:17:54 INFO skein.ApplicationMaster: Unregistering application with status SUCCEEDED
19/03/01 12:17:54 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
19/03/01 12:17:54 WARN security.UserGroupInformation: PriviledgedActionException as:jlord (auth:KERBEROS) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
19/03/01 12:17:54 WARN ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
19/03/01 12:17:54 WARN security.UserGroupInformation: PriviledgedActionException as:jlord (auth:KERBEROS) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
19/03/01 12:17:54 INFO skein.ApplicationMaster: Deleted application directory hdfs://nameservice1/user/jlord/.skein/application_1551067831607_22507
19/03/01 12:17:54 INFO skein.ApplicationMaster: WebUI server shut down
19/03/01 12:17:54 INFO skein.ApplicationMaster: gRPC server shut down

LogType:container-localizer-syslog
Log Upload Time:Fri Mar 01 12:17:55 -0600 2019
LogLength:2442
Log Contents:
2019-03-01 12:17:06,120 WARN [ContainerLocalizer Downloader] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:jlord (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
2019-03-01 12:17:06,122 WARN [ContainerLocalizer Downloader] org.apache.hadoop.ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
2019-03-01 12:17:06,122 WARN [ContainerLocalizer Downloader] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:jlord (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
2019-03-01 12:17:07,687 WARN [ContainerLocalizer Downloader] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:jlord (auth:KERBEROS) cause:org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
2019-03-01 12:17:07,688 WARN [ContainerLocalizer Downloader] org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider: KMS provider at [https://all2.allstate.com:16000/kms/v1/] threw an IOException [org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]!!
2019-03-01 12:17:08,483 WARN [ContainerLocalizer Downloader] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:jlord (auth:KERBEROS) cause:org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
2019-03-01 12:17:08,485 WARN [ContainerLocalizer Downloader] org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider: KMS provider at [https://all2.allstate.com:16000/kms/v1/] threw an IOException [org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]!!



Container: container_e10_1551067831607_22507_01_000002 on all1753.allstate.com_8041
=====================================================================================
LogType:container-localizer-syslog
Log Upload Time:Fri Mar 01 12:17:55 -0600 2019
LogLength:2442
Log Contents:
2019-03-01 12:17:17,341 WARN [ContainerLocalizer Downloader] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:jlord (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
2019-03-01 12:17:17,347 WARN [ContainerLocalizer Downloader] org.apache.hadoop.ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
2019-03-01 12:17:17,348 WARN [ContainerLocalizer Downloader] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:jlord (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
2019-03-01 12:17:19,085 WARN [ContainerLocalizer Downloader] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:jlord (auth:KERBEROS) cause:org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
2019-03-01 12:17:19,088 WARN [ContainerLocalizer Downloader] org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider: KMS provider at [https://all2.allstate.com:16000/kms/v1/] threw an IOException [org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]!!
2019-03-01 12:17:51,257 WARN [ContainerLocalizer Downloader] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:jlord (auth:KERBEROS) cause:org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
2019-03-01 12:17:51,257 WARN [ContainerLocalizer Downloader] org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider: KMS provider at [https://all2.allstate.com:16000/kms/v1/] threw an IOException [org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]!!

LogType:dask.scheduler.log
Log Upload Time:Fri Mar 01 12:17:55 -0600 2019
LogLength:633
Log Contents:
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at: tcp://10.195.102.137:38407
distributed.scheduler - INFO -       bokeh at:                    :36105
distributed.scheduler - INFO - Receive client connection: Client-543b1970-3c4e-11e9-9f37-246e9682cec8
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-543b1970-3c4e-11e9-9f37-246e9682cec8
distributed.scheduler - INFO - Remove client Client-543b1970-3c4e-11e9-9f37-246e9682cec8
distributed.scheduler - INFO - Close client connection: Client-543b1970-3c4e-11e9-9f37-246e9682cec8

Cluster reports "pending connection" for possibly too long?

In [13]: cluster.application_client.containers()
Out[13]: [Container<service_name='dask.scheduler', instance=0, state=RUNNING>]

In [14]: cluster
Out[14]: YarnCluster<"pending connection">

In [15]: cluster.application_client.kv
Out[15]: <skein.core.KeyValueStore at 0x7fe67d797518>

In [16]: dict(cluster.application_client.kv)
Out[16]: 
{'dask.scheduler': 'tcp://172.22.0.3:37279',
 'dask.dashboard': 'http://172.22.0.3:46003'}

In [17]: from dask.distributed import Client

In [18]: client = Client(cluster)
18/07/01 13:04:50 INFO skein.Daemon: Starting process disconnected, shutting down
18/07/01 13:04:50 INFO skein.Daemon: Shutting down gRPC server
18/07/01 13:04:50 INFO skein.Daemon: gRPC server shut down

In [19]: client
Out[19]: <Client: scheduler='tcp://172.22.0.3:37279' processes=0 cores=0>

In [20]: cluster
Out[20]: YarnCluster<'tcp://172.22.0.3:37279'>

Make config dir in containers configurable

dask-yarn currently sets the DASK_CONFIG directory to the fixed value .config in the service setup, but that directory might not always be writable. Would be nice to make that overridable.

I'm actually not so sure why that directory needs to be configured at all. Are users expected to provide persistent configuration there across container runs? If not, then why not set it to something like /tmp?

[question] Dask yarn strategy for available resource utilization on EMR

I am working on using dask yarn on EMR using ephemeral clusters. As such, I would like to use the cluster resources at their maximum.

However, I am struggling to get dask to use more than 50 % of the cores. I understand that like with any yarn app, I can specify the resources my app wants. However this logic doesnt really make sense in the context of single app, ephemeral tasks. Since there is only one app running on a cluster designed for it, it should use all of the available resources.

Is there any way to create a yarn cluster such that it uses all of the available resources? I apologize if this is somewhat documented but havent been able to find it in the docs.

Update to work with distributed >= 0.23.0

Hi
I'm running with the following versions

dask_yarn = 0.3.1
dask = 0.19.3
distributed = 1.23.3
skein = 0.1.1

I'm successfully running the following code:

from  dask_yarn import YarnCluster
from dask.distributed import Client
cluster = YarnCluster(environment = 'testDask.tar.gz', worker_vcores=4, worker_memory='4GB', n_workers=16, name='task1')
client = Client(cluster)
client.get_versions(check=True)

{'scheduler': {'host': (('python', '3.6.6.final.0'),
('python-bits', 64),
('OS', 'Linux'),
('OS-release', '4.4.73-5-default'),
('machine', 'x86_64'),
('processor', 'x86_64'),
('byteorder', 'little'),
('LC_ALL', 'POSIX'),
('LANG', 'en_US.UTF-8'),
('LOCALE', 'None.None')),
'packages': {'required': (('dask', '0.19.3'),
('distributed', '1.23.3'),
('msgpack', '0.5.6'),
('cloudpickle', '0.5.3'),
('tornado', '5.1'),
('toolz', '0.9.0')),
'optional': (('numpy', '1.15.0'),
('pandas', '0.23.4'),
('bokeh', '0.13.0'),
('lz4', None),
('dask_ml', None),
('blosc', None))}},
'workers': {},
'client': {'host': [('python', '3.6.5.final.0'),
('python-bits', 64),
('OS', 'Linux'),
('OS-release', '4.4.73-5-default'),
('machine', 'x86_64'),
('processor', 'x86_64'),
('byteorder', 'little'),
('LC_ALL', 'None'),
('LANG', 'en_US.UTF-8'),
('LOCALE', 'en_US.UTF-8')],
'packages': {'required': [('dask', '0.19.3'),
('distributed', '1.23.3'),
('msgpack', '0.5.6'),
('cloudpickle', '0.5.3'),
('tornado', '5.1'),
('toolz', '0.9.0')],
'optional': [('numpy', '1.15.0'),
('pandas', '0.23.4'),
('bokeh', '0.13.0'),
('lz4', None),
('dask_ml', None),
('blosc', None)]}}}

but when running:
cluster.scale(8)

I'm getting the following error

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-11-6e6c3e46d9a8> in <module>()
----> 1 cluster.scale(8)
/opt/anaconda3/lib/python3.6/site-packages/dask_yarn/core.py in scale(self, n)
    315 
    316             if n_to_delete:
--> 317                 to_close = self._select_workers_to_close(n_to_delete)
    318                 self.scale_down(to_close)
    319 
/opt/anaconda3/lib/python3.6/site-packages/dask_yarn/core.py in _select_workers_to_close(self, n)
    285         worker_info = client.scheduler_info()['workers']
    286         # Sort workers by memory used
--> 287         workers = sorted((v['memory'], k) for k, v in worker_info.items())
    288         # Return just the ips
    289         return [w[1] for w in workers[:n]]
/opt/anaconda3/lib/python3.6/site-packages/dask_yarn/core.py in <genexpr>(.0)
    285         worker_info = client.scheduler_info()['workers']
    286         # Sort workers by memory used
--> 287         workers = sorted((v['memory'], k) for k, v in worker_info.items())
    288         # Return just the ips
    289         return [w[1] for w in workers[:n]]
KeyError: 'memory'

any ideas?

Drop Python 2.7 support after next release

I plan to issue a new release in the next week, which will be the last release to support Python 2.7. Future releases will be Python 3 only. This will ease the maintenance burden, simplify our code, and allow us to use more modern concurrency features.

See also jcrist/skein#180.

Dask on Yarn ( HDP 2.6 )

18/09/25 15:24:57 INFO skein.Daemon: Removing callbacks for application_1537170416259_7296
18/09/25 15:25:36 INFO impl.YarnClientImpl: Killed application application_1537170416259_7296
Traceback (most recent call last):
  File "main.py", line 10, in <module>
    cluster = YarnCluster()
  File "/analytics/anaconda3/envs/myenv/lib/python3.6/site-packages/dask_yarn/core.py", line 179, in __init__
    self._start_cluster(spec, skein_client)
  File "/analytics/anaconda3/envs/myenv/lib/python3.6/site-packages/dask_yarn/core.py", line 204, in _start_cluster
    scheduler_address = app.kv.wait('dask.scheduler').decode()
  File "/analytics/anaconda3/envs/myenv/lib/python3.6/site-packages/skein/kv.py", line 655, in wait
    event = event_queue.get()
  File "/analytics/anaconda3/envs/myenv/lib/python3.6/site-packages/skein/kv.py", line 281, in get
    raise out
skein.exceptions.ConnectionError: Unable to connect to application
18/09/25 15:25:36 INFO skein.Daemon: Starting process disconnected, shutting down
18/09/25 15:25:36 INFO skein.Daemon: Shutting down gRPC server
18/09/25 15:25:36 INFO skein.Daemon: gRPC server shut down`

my main.py

from dask_yarn import YarnCluster
from dask.distributed import Client
import dask.dataframe as dd
import dask

print('dask.config.config =>', dask.config.config)
cluster = YarnCluster()
client = Client(cluster)
version = client.get_versions(check=True)
print('version', version)
 df = dd.read_csv('hdfs:///data/my.csv')
 print(df)

`

the coda.yml

---
name: myenv
channels:
  - defaults
  - conda-forge
dependencies:
  - conda-pack
  - python=3.6
  - dask
  - dask-yarn
  - gcsfs
  - pyarrow
  - hdfs3
  - libhdfs3

the config/dask/yarn.yaml

`yarn:
#   specification: null        # A full Skein specification or path to a
#                              # specification yaml file. Overrides the following
#                              # configuration if given

   name: dask                 # Application name
   queue: defult             # Yarn queue to deploy to
   environment: environment.tar.gz          # Path to conda packed environment
#   tags: []                   # List of strings to tag applications

   scheduler:                 # Specifications of scheduler container
     vcores: 1
     memory: 2GiB

   worker:                   # Specifications of worker containers
     vcores: 1
     memory: 2GiB
     count: 2                # Number of workers to start on initialization
     restarts: -1            # Allowed number of restarts, -1 for unlimited
     env: {}                 # A map of environment variables to set on the worker`

the only logs that I have are poor

18/09/25 15:25:13 INFO skein.ApplicationMaster: INTIALIZING: dask.worker:
18/09/25 15:25:13 INFO skein.ApplicationMaster: WAITING: dask.worker_0:
18/09/25 15:25:13 INFO skein.ApplicationMaster: WAITING: dask.worker_1:
18/09/25 15:25:13 INFO skein.ApplicationMaster: INTIALIZING: dask.scheduler:
18/09/25 15:25:13 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0:
18/09/25 15:25:14 INFO skein.ApplicationMaster: watch called:
18/09/25 15:25:14 INFO skein.ApplicationMaster: Created Watcher. id=0, start=dask.scheduler, end=dask.scheduler, type=PUT:
18/09/25 15:25:15 INFO impl.AMRMClientImpl: Received new token for : server1:45454:
18/09/25 15:25:15 INFO skein.ApplicationMaster: Starting container_e50_1537170416259_7296_01_000002:
18/09/25 15:25:15 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_e50_1537170416259_7296_01_000002:
18/09/25 15:25:15 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0:
18/09/25 15:25:15 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1:
18/09/25 15:25:15 INFO impl.ContainerManagementProtocolProxy: Opening proxy : server1:45454:
18/09/25 15:25:17 INFO impl.AMRMClientImpl: Received new token for : server1:45454:
18/09/25 15:25:17 INFO skein.ApplicationMaster: Starting container_e50_1537170416259_7296_01_000003:
18/09/25 15:25:17 INFO skein.ApplicationMaster: RUNNING: dask.worker_0 on container_e50_1537170416259_7296_01_000003:
18/09/25 15:25:17 INFO skein.ApplicationMaster: Starting container_e50_1537170416259_7296_01_000004:
18/09/25 15:25:17 INFO impl.ContainerManagementProtocolProxy: Opening proxy : server1:45454:
18/09/25 15:25:17 INFO skein.ApplicationMaster: RUNNING: dask.worker_1 on container_e50_1537170416259_7296_01_000004:
18/09/25 15:25:17 INFO impl.ContainerManagementProtocolProxy: Opening proxy : server1:45454:
18/09/25 15:25:18 INFO skein.ApplicationMaster: FAILED: dask.worker_1:
18/09/25 15:25:18 INFO skein.ApplicationMaster: RESTARTING: adding new container to replace dask.worker_1:
18/09/25 15:25:18 INFO skein.ApplicationMaster: REQUESTED: dask.worker_2:
18/09/25 15:25:20 INFO skein.ApplicationMaster: Starting container_e50_1537170416259_7296_01_000005:
18/09/25 15:25:20 INFO skein.ApplicationMaster: RUNNING: dask.worker_2 on container_e50_1537170416259_7296_01_000005:
18/09/25 15:25:20 INFO impl.ContainerManagementProtocolProxy: Opening proxy : server1:45454:
18/09/25 15:25:52 INFO skein.ApplicationMaster: Shutting down gRPC server:
18/09/25 15:25:52 INFO skein.ApplicationMaster: gRPC server shut down:

is it possible to give me some insights about how to launch dask on yarn, how to configure the debug mode?

thank you very much for your support, in advance

skein.driver shut down

Here is my submit:
dask-yarn submit --environment dask_venv.tar.gz --worker-count 8 --worker-vcores 2 --worker-memory 4GiB --scheduler-vcores 2 --scheduler-memory 4GiB test_submit.py

Here is my test code:

from sklearn.ensemble import GradientBoostingClassifier
import sklearn.datasets
import dask_ml.datasets
from dask_ml.wrappers import ParallelPostFit
import time
from dask_yarn import YarnCluster
from dask.distributed import Client

cluster = YarnCluster.from_current()
client=Client(cluster)

cluster.scale(10)

X,y = sklearn.datasets.make_classification(n_samples = 1000, random_state=0)

clf = ParallelPostFit(estimator=GradientBoostingClassifier())
clf.fit(X,y)

X_big, _ = dask_ml.datasets.make_classification(n_samples = 2000000, chunks=200000, random_state=0)

t1 = time.time()
clf.predict(X_big).compute()
print("Time dask:{}".format(time.time()-t1))
dask_time = time.time() - t1

clf_local = GradientBoostingClassifier()
clf_local.fit(X,y)

t2 = time.time()
clf_local.predict(X_big)
print("Time sklearn:{}".format(time.time()-t2))
sklearn_time = time.time() - t2

time = [dask_time,sklearn_time]

with open('test.txt','w') as f:
    for item in my_list:
        f.write("%s\n" % item)

Here is my error:

19/05/29 02:46:07 INFO client.RMProxy: Connecting to ResourceManager at gce-europe-west2-a-d-dataproc-sql-debug6-m/192.168.3.40:8032
19/05/29 02:46:07 INFO client.AHSProxy: Connecting to Application History server at gce-europe-west2-a-d-dataproc-sql-debug6-m/192.168.3.40:10200
19/05/29 02:46:08 INFO skein.Driver: Driver started, listening on 35151
19/05/29 02:46:08 INFO skein.Driver: Uploading application resources to hdfs://gce-europe-west2-a-d-dataproc-sql-debug6-m/user/d5054581/.skein/application_1557133139144_0344
19/05/29 02:46:09 INFO skein.Driver: Submitting application...
19/05/29 02:46:10 INFO impl.YarnClientImpl: Submitted application application_1557133139144_0344
application_1557133139144_0344
19/05/29 02:46:10 INFO skein.Driver: Driver shut down

Here is the dask-yarn status:
Here is the dask-yarn status:

19/05/29 03:04:23 INFO skein.Driver: Driver started, listening on 39537
19/05/29 03:04:24 INFO skein.Driver: Driver shut down
APPLICATION_ID                    NAME    STATE       STATUS    CONTAINERS    VCORES    MEMORY    RUNTIME
application_1557133139144_0343    dask    FINISHED    FAILED    0             0         0         17s

And I cannot find the application logs:

19/05/29 03:02:35 INFO client.RMProxy: Connecting to ResourceManager at gce-europe-west2-a-d-dataproc-sql-debug6-m/192.168.3.40:8032
19/05/29 03:02:35 INFO client.AHSProxy: Connecting to Application History server at gce-europe-west2-a-d-dataproc-sql-debug6-m/192.168.3.40:10200
File /yarn-logs/d5054581/logs/application_1557133139144_0343 does not exist.
Can not find any log file matching the pattern: [ALL] for the application: application_1557133139144_0343
Can not find the logs for the application: application_1557133139144_0343 with the appOwner: d5054581

Could you help to check the reason?
Many Thanks

Unable to load libhdfs, can anyone help to sort it out, thank you very much.

Trying to use pyarrow to access hdfs file and not able to get it working, below is the code, thank you very much in advance.

[rxie@cedgedev03 code]$ python
Python 2.7.12 |Anaconda 4.2.0 (64-bit)| (default, Jul 2 2016, 17:42:40)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Anaconda is brought to you by Continuum Analytics.
Please check out: http://continuum.io/thanks and https://anaconda.org

import pyarrow
import os
os.environ["JAVA_HOME"]="/usr/java/jdk1.8.0_121"
from pyarrow import hdfs
fs = hdfs.connect()
Traceback (most recent call last):
File "", line 1, in
File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/pyarrow/hdfs.py", line 183, in connect
extra_conf=extra_conf)
File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/pyarrow/hdfs.py", line 37, in init
self._connect(host, port, user, kerb_ticket, driver, extra_conf)
File "pyarrow/io-hdfs.pxi", line 89, in pyarrow.lib.HadoopFileSystem._connect
File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: Unable to load libhdfs

reading csv from hdfs with pyarrow

Hi,
We are trying out dask_yarn version 0.3.0 (with dask 0.18.2)
because of the conflicts between the boost-cpp i'm running with pyarrow version 0.10.0
We are trying to read a csv from hdfs - however we get an error when running dd.read_csv('hdfs:///path/to/file.csv') since it is trying to use hdfs3.
from the documentation it seems that there is an option to use pyarrow .

What is the correct syntax/configuration to do so?

Thx for your time and great work.

Sorry I guess this should be in the dask repo and not dask_yarn.
will post it there

Read parquet error

Error trying to read a parquet file. Looks like the useful info is the first few lines, but I can't parse it. Any direction would help.

2019-02-19 09:46:43.006328, p43349, th140597036791616, ERROR Failed to invoke RPC call "getFsStats" on server "namenode:8020": 
RpcChannel.cpp: 483: HdfsRpcException: Failed to invoke RPC call "getFsStats" on server "namenode:8020"
Caused by
RpcChannel.cpp: 931: HdfsRpcServerException: org.apache.hadoop.security.authorize.AuthorizationException: User: jlord is not allowed to impersonate 1�H�M�dH3
             %(

2019-02-19 09:46:43.006434, p43349, th140597036791616, INFO Retry idempotent RPC call "getFsStats" on server "namenode:8020"
Traceback (most recent call last):
  File "simple.py", line 19, in <module>
    df = dataframe.read_parquet('hdfs:///user/hive/warehouse/dra_sanity_tests.db/test_excavator')
  File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask/dataframe/io/parquet.py", line 1145, in read_parquet
    storage_options=storage_options
  File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask/bytes/core.py", line 354, in get_fs_token_paths
    fs, fs_token = get_fs(protocol, options)
  File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask/bytes/core.py", line 513, in get_fs
    fs = cls(**storage_options)
  File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask/bytes/hdfs3.py", line 15, in __init__
    self.fs = hdfs3.HDFileSystem(**kwargs)
  File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/hdfs3/core.py", line 76, in __init__
    self.connect()
  File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/hdfs3/core.py", line 141, in connect
    raise ConnectionError('Connection Failed: {}'.format(msg))
ConnectionError: Connection Failed: HdfsRpcException: Failed to invoke RPC call "getFsStats" on server "namenode:8020"	Caused by: HdfsRpcServerException: org.apache.hadoop.security.authorize.AuthorizationException: User: jlord is not allowed to impersonate 1�H�M�dH3
                                                           %(

Amazon EMR bootstrap failing for workers

I'm following the instructions for deploiying on EMR and using the bootstrap-dask script with no modifications from this repo. I'm able to successfully connect to the jupyter notebook server and create a YarnCluster. However, when I try to add workers to the cluster they never successfully start. Looking at the Skein logs, it looks like it's trying to start containers, but they are all failing with the following message:

Traceback (most recent call last):
  File "/mnt3/yarn/usercache/hadoop/appcache/application_1561654916149_0001/container_1561654916149_0001_01_000002/environment/bin/dask-yarn", line 7, in <module>
    from dask_yarn.cli import main
  File "/mnt3/yarn/usercache/hadoop/appcache/application_1561654916149_0001/container_1561654916149_0001_01_000002/environment/lib/python3.6/site-packages/dask_yarn/cli.py", line 17, in <module>
    from distributed.cli.utils import install_signal_handlers, uri_from_host_port
ImportError: cannot import name 'uri_from_host_port'

Here is the results of a pip freeze

asn1crypto==0.24.0
attrs==19.1.0
backcall==0.1.0
bleach==3.1.0
bokeh==1.2.0
boto3==1.9.162
botocore==1.12.163
certifi==2019.6.16
cffi==1.12.3
chardet==3.0.4
Click==7.0
cloudpickle==1.1.1
conda==4.7.5
conda-pack==0.3.1
conda-package-handling==1.3.10
cryptography==2.7
cytoolz==0.9.0.1
dask==2.0.0
dask-yarn==0.6.0
decorator==4.4.0
defusedxml==0.6.0
distributed==2.0.1
docutils==0.14
entrypoints==0.3
grpcio==1.16.1
heapdict==1.0.0
idna==2.8
ipykernel==5.1.1
ipython==7.5.0
ipython-genutils==0.2.0
ipywidgets==7.4.2
jedi==0.13.3
Jinja2==2.10.1
jmespath==0.9.4
jsonschema==3.0.1
jupyter-client==5.2.4
jupyter-core==4.4.0
libarchive-c==2.8
locket==0.2.0
MarkupSafe==1.1.1
mistune==0.8.4
msgpack==0.6.1
nbconvert==5.5.0
nbformat==4.4.0
nbserverproxy==0.8.8
notebook==5.7.8
numpy==1.16.4
olefile==0.46
packaging==19.0
pandas==0.24.2
pandocfilters==1.4.2
parso==0.4.0
partd==0.3.10
pexpect==4.7.0
pickleshare==0.7.5
Pillow==6.0.0
prometheus-client==0.6.0
prompt-toolkit==2.0.9
protobuf==3.8.0
psutil==5.6.2
ptyprocess==0.6.0
pyarrow==0.11.1
pycosat==0.6.3
pycparser==2.19
Pygments==2.4.2
pyOpenSSL==19.0.0
pyparsing==2.4.0
pyrsistent==0.14.11
PySocks==1.7.0
python-dateutil==2.8.0
pytz==2019.1
PyYAML==5.1
pyzmq==18.0.0
requests==2.22.0
ruamel-yaml==0.15.46
s3fs==0.2.1
s3transfer==0.2.0
Send2Trash==1.5.0
six==1.12.0
skein==0.7.3
sortedcontainers==2.1.0
tblib==1.4.0
terminado==0.8.2
testpath==0.4.2
toolz==0.9.0
tornado==6.0.2
tqdm==4.32.1
traitlets==4.3.2
urllib3==1.24.2
wcwidth==0.1.7
webencodings==0.5.1
widgetsnbextension==3.4.2
zict==0.1.4

Failed to load java libraries

I am trying to run dask-yarn 0.5.1 on a corporate yarn cluster and get the following error:

java.lang.IllegalArgumentException: Failed to load any of the given libraries: [netty_tcnative_linux_x86_64, netty_tcnative_linux_x86_64_fedora, netty_tcnative_x86_64, netty_tcnative]

(full error message below)

Another co-worker stumbled upon the same error independently. I am wondering if these packages should be downloaded and can't because our servers cannot connect to maven except through a mirror. I have tried to setup my mirror but am such a maven novice that it may not be working. Either that or the java environment on the server could be configured in an odd way. Any direction for where to look would be helpful.

	at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadFirstAvailable(NativeLibraryLoader.java:104)
	at com.anaconda.skein.shaded.io.netty.handler.ssl.OpenSsl.loadTcNative(OpenSsl.java:440)
	at com.anaconda.skein.shaded.io.netty.handler.ssl.OpenSsl.<clinit>(OpenSsl.java:97)
	at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.defaultSslProvider(GrpcSslContexts.java:244)
	at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:171)
	at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.forServer(GrpcSslContexts.java:151)
	at com.anaconda.skein.Driver.startServer(Driver.java:119)
	at com.anaconda.skein.Driver.run(Driver.java:278)
	at com.anaconda.skein.Driver.main(Driver.java:168)
	Suppressed: java.lang.UnsatisfiedLinkError: /tmp/libcom_anaconda_skein_shaded_netty_tcnative_linux_x86_64218435490097249322.so: /tmp/libcom_anaconda_skein_shaded_netty_tcnative_linux_x86_64218435490097249322.so: failed to map segment from shared object: Operation not permitted
		at java.lang.ClassLoader$NativeLibrary.load(Native Method)
		at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
		at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
		at java.lang.Runtime.load0(Runtime.java:809)
		at java.lang.System.load(System.java:1086)
		at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:36)
		at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:316)
		at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:215)
		at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadFirstAvailable(NativeLibraryLoader.java:96)
		... 8 more
		Suppressed: java.lang.UnsatisfiedLinkError: /tmp/libcom_anaconda_skein_shaded_netty_tcnative_linux_x86_64218435490097249322.so: /tmp/libcom_anaconda_skein_shaded_netty_tcnative_linux_x86_64218435490097249322.so: failed to map segment from shared object: Operation not permitted
			at java.lang.ClassLoader$NativeLibrary.load(Native Method)
			at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
			at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
			at java.lang.Runtime.load0(Runtime.java:809)
			at java.lang.System.load(System.java:1086)
			at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:36)
			at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
			at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
			at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
			at java.lang.reflect.Method.invoke(Method.java:498)
			at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:336)
			at java.security.AccessController.doPrivileged(Native Method)
			at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:328)
			at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:306)
			... 10 more
		Suppressed: java.lang.UnsatisfiedLinkError: no com_anaconda_skein_shaded_netty_tcnative_linux_x86_64 in java.library.path
			at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
			at java.lang.Runtime.loadLibrary0(Runtime.java:870)
			at java.lang.System.loadLibrary(System.java:1122)
			at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
			at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:316)
			at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
			... 9 more
			Suppressed: java.lang.UnsatisfiedLinkError: no com_anaconda_skein_shaded_netty_tcnative_linux_x86_64 in java.library.path
				at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
				at java.lang.Runtime.loadLibrary0(Runtime.java:870)
				at java.lang.System.loadLibrary(System.java:1122)
				at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
				at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
				at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
				at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
				at java.lang.reflect.Method.invoke(Method.java:498)
				at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:336)
				at java.security.AccessController.doPrivileged(Native Method)
				at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:328)
				at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:306)
				... 10 more
	Suppressed: java.lang.UnsatisfiedLinkError: could not load a native library: com_anaconda_skein_shaded_netty_tcnative_linux_x86_64_fedora
		at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:233)
		at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadFirstAvailable(NativeLibraryLoader.java:96)
		... 8 more
	Caused by: java.io.FileNotFoundException: META-INF/native/libcom_anaconda_skein_shaded_netty_tcnative_linux_x86_64_fedora.so
		at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:173)
		... 9 more
		Suppressed: java.lang.UnsatisfiedLinkError: no com_anaconda_skein_shaded_netty_tcnative_linux_x86_64_fedora in java.library.path
			at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
			at java.lang.Runtime.loadLibrary0(Runtime.java:870)
			at java.lang.System.loadLibrary(System.java:1122)
			at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
			at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:316)
			at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
			... 9 more
			Suppressed: java.lang.UnsatisfiedLinkError: no com_anaconda_skein_shaded_netty_tcnative_linux_x86_64_fedora in java.library.path
				at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
				at java.lang.Runtime.loadLibrary0(Runtime.java:870)
				at java.lang.System.loadLibrary(System.java:1122)
				at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
				at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
				at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
				at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
				at java.lang.reflect.Method.invoke(Method.java:498)
				at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:336)
				at java.security.AccessController.doPrivileged(Native Method)
				at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:328)
				at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:306)
				... 10 more
	Suppressed: java.lang.UnsatisfiedLinkError: could not load a native library: com_anaconda_skein_shaded_netty_tcnative_x86_64
		at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:233)
		at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadFirstAvailable(NativeLibraryLoader.java:96)
		... 8 more
	Caused by: java.io.FileNotFoundException: META-INF/native/libcom_anaconda_skein_shaded_netty_tcnative_x86_64.so
		at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:173)
		... 9 more
		Suppressed: java.lang.UnsatisfiedLinkError: no com_anaconda_skein_shaded_netty_tcnative_x86_64 in java.library.path
			at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
			at java.lang.Runtime.loadLibrary0(Runtime.java:870)
			at java.lang.System.loadLibrary(System.java:1122)
			at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
			at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:316)
			at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
			... 9 more
			Suppressed: java.lang.UnsatisfiedLinkError: no com_anaconda_skein_shaded_netty_tcnative_x86_64 in java.library.path
				at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
				at java.lang.Runtime.loadLibrary0(Runtime.java:870)
				at java.lang.System.loadLibrary(System.java:1122)
				at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
				at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
				at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
				at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
				at java.lang.reflect.Method.invoke(Method.java:498)
				at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:336)
				at java.security.AccessController.doPrivileged(Native Method)
				at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:328)
				at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:306)
				... 10 more
	Suppressed: java.lang.UnsatisfiedLinkError: could not load a native library: com_anaconda_skein_shaded_netty_tcnative
		at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:233)
		at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadFirstAvailable(NativeLibraryLoader.java:96)
		... 8 more
	Caused by: java.io.FileNotFoundException: META-INF/native/libcom_anaconda_skein_shaded_netty_tcnative.so
		at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:173)
		... 9 more
		Suppressed: java.lang.UnsatisfiedLinkError: no com_anaconda_skein_shaded_netty_tcnative in java.library.path
			at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
			at java.lang.Runtime.loadLibrary0(Runtime.java:870)
			at java.lang.System.loadLibrary(System.java:1122)
			at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
			at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:316)
			at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
			... 9 more
			Suppressed: java.lang.UnsatisfiedLinkError: no com_anaconda_skein_shaded_netty_tcnative in java.library.path
				at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
				at java.lang.Runtime.loadLibrary0(Runtime.java:870)
				at java.lang.System.loadLibrary(System.java:1122)
				at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
				at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
				at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
				at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
				at java.lang.reflect.Method.invoke(Method.java:498)
				at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:336)
				at java.security.AccessController.doPrivileged(Native Method)
				at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:328)
				at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:306)
				... 10 more

Feb 15, 2019 4:08:39 PM com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts defaultSslProvider
INFO: Conscrypt not found (this may be normal)
Feb 15, 2019 4:08:39 PM com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts defaultSslProvider
INFO: Jetty ALPN unavailable (this may be normal)
java.lang.ClassNotFoundException: com/anaconda/skein/shaded/org/eclipse/jetty/alpn/ALPN
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at com.anaconda.skein.shaded.io.grpc.netty.JettyTlsUtil.isJettyAlpnConfigured(JettyTlsUtil.java:64)
	at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.findJdkProvider(GrpcSslContexts.java:266)
	at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.defaultSslProvider(GrpcSslContexts.java:248)
	at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:171)
	at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.forServer(GrpcSslContexts.java:151)
	at com.anaconda.skein.Driver.startServer(Driver.java:119)
	at com.anaconda.skein.Driver.run(Driver.java:278)
	at com.anaconda.skein.Driver.main(Driver.java:168)

19/02/15 16:08:39 ERROR skein.Driver: Error running Driver
java.lang.IllegalStateException: Could not find TLS ALPN provider; no working netty-tcnative, Conscrypt, or Jetty NPN/ALPN available
	at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.defaultSslProvider(GrpcSslContexts.java:258)
	at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:171)
	at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.forServer(GrpcSslContexts.java:151)
	at com.anaconda.skein.Driver.startServer(Driver.java:119)
	at com.anaconda.skein.Driver.run(Driver.java:278)
	at com.anaconda.skein.Driver.main(Driver.java:168)
Traceback (most recent call last):
  File "simple.py", line 8, in <module>
    worker_memory="8GiB")
  File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask_yarn/core.py", line 291, in __init__
    self._start_cluster(spec, skein_client)
  File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask_yarn/core.py", line 335, in _start_cluster
    skein_client = _get_skein_client(skein_client)
  File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask_yarn/core.py", line 44, in _get_skein_client
    return skein.Client(security=security)
  File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/skein/core.py", line 334, in __init__
    java_options=java_options)
  File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/skein/core.py", line 247, in _start_driver
    raise DriverError("Failed to start java process")
skein.exceptions.DriverError: Failed to start java process
Exception ignored in: <function Client.__del__ at 0x7f420d5c5c80>
Traceback (most recent call last):
  File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/skein/core.py", line 472, in __del__
    self.close()
  File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/skein/core.py", line 461, in close
    if self._proc is not None:
AttributeError: _proc

Different behaviour when using dask-yarn interactively vs. non-interactively

dask_poc.py

#!/usr/bin/env python

from __future__ import print_function

import logging
import os
import sys
import time

import dask_yarn
from dask.distributed import Client

logger = logging.getLogger(__name__)

VIRTUALENV_TARBALL = environment=os.path.expanduser("~/venv-dask-upstream.tar.gz")
WORKER_ENV = {"HADOOP_CONF_DIR": "/data/app/spark-yarn/hadoop-conf", "JAVA_HOME": "/usr/lib/jvm/java"}


logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s %(levelname)s %(name)s: %(message)s")

# Dask has issues with these
for proxy_var in ['http_proxy', 'https_proxy']:
    if proxy_var in os.environ:
        del os.environ[proxy_var]

logger.info("Initialising YarnCluster")
cluster_start_time = time.time()
cluster = dask_yarn.YarnCluster(
    VIRTUALENV_TARBALL,
    worker_vcores=1,
    worker_memory="3GB",
    worker_env=WORKER_ENV,
    n_workers=1,
    scheduler_memory="1GB")
logger.info("Initialising YarnCluster: done in %.3f", time.time() - cluster_start_time)

logger.info("Initialising Client")
client_start_time = time.time()
dask_client = Client(cluster)
logger.info("Initialising Client: done in %.3f", time.time() - client_start_time)

logger.info("Checking versions")
versions_start_time = time.time()
_versions = dask_client.get_versions(check=True)
logger.info("Checking versions: done in %.3f", time.time() - versions_start_time)

logger.info("Creating futures")
futures = dask_client.map(lambda x: x ** 2, range(4))

logger.info("Gathering futures")
print("Result: %r" % (dask_client.gather(futures),))

non-interactive execution

Everything works as expected:

(venv-dask-upstream)george@george:~$ python dask_poc.py
[...]
Result: [0, 1, 4, 9]

interactive execution

(venv-dask-upstream)george@george:~$ python
>>> from dask_poc import *
[...]
2018-09-17 11:02:50,823 INFO dask_poc: Initialising Client
<loops forever in https://github.com/dask/distributed/blob/1.23.1/distributed/utils.py#L274-L275 >

Environment:

  • CentOS 7.4.1708
  • Linux 3.10.0-693.2.2.el7.x86_64
  • Python 2.7.5
  • Hadoop 3.1.2-SNAPSHOT (compiled from source)
  • pip freeze:
asn1crypto==0.24.0
backports-abc==0.5
backports.shutil-get-terminal-size==1.0.0
backports.weakref==1.0.post1
cffi==1.11.5
click==6.7
cloudpickle==0.5.6
cryptography==2.3.1
dask==0.19.1
dask-yarn==0.3.1
decorator==4.3.0
distributed==1.23.1
enum34==1.1.6
futures==3.2.0
grpcio==1.15.0
HeapDict==1.0.0
idna==2.7
ipaddress==1.0.22
ipdb==0.11
ipython==5.8.0
ipython-genutils==0.2.0
msgpack==0.5.6
pathlib2==2.3.2
pexpect==4.6.0
pickleshare==0.7.4
prompt-toolkit==1.0.15
protobuf==3.6.1
psutil==5.4.7
ptyprocess==0.6.0
pycparser==2.18
Pygments==2.2.0
PyYAML==3.13
scandir==1.9.0
simplegeneric==0.8.1
singledispatch==3.4.0.3
six==1.11.0
skein==0.1.1
sortedcontainers==2.0.5
tblib==1.3.2
toolz==0.9.0
tornado==5.1
traitlets==4.3.2
venv-pack==0.2.0
wcwidth==0.1.7
zict==0.1.3

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.