dask / dask-yarn Goto Github PK
View Code? Open in Web Editor NEWDeploy dask on YARN clusters
Home Page: http://yarn.dask.org
License: BSD 3-Clause "New" or "Revised" License
Deploy dask on YARN clusters
Home Page: http://yarn.dask.org
License: BSD 3-Clause "New" or "Revised" License
I am using aws emr with 1 master and 2 worker node (r5.xlarge). Hadoop, spark, and Zeppelin is all set up. I am using zeppelin for notebook processing. My bootstrap script for emr:
#!/usr/bin/env bash
set -x -e
# move /usr/local to /mnt/usr-moved/local; else run out of space on /
sudo mkdir /mnt/usr-moved
sudo mv /usr/local /mnt/usr-moved/
sudo ln -s /mnt/usr-moved/local /usr/
sudo mv /usr/share /mnt/usr-moved/
sudo ln -s /mnt/usr-moved/share /usr/
#sudo bash -c 'yum update -y && yum upgrade && sudo yum groupinstall -y 'Development Tools''
sudo bash -c 'yum group install -y 'Development Tools''
sudo yum install -y htop
# Install anaconda
wget https://repo.continuum.io/archive/Anaconda3-5.3.0-Linux-x86_64.sh -O $HOME/anaconda.sh && /bin/bash ~/anaconda.sh -b -p $HOME/anaconda
echo -e '\nexport PATH=$HOME/anaconda/bin:$PATH' >> $HOME/.bashrc && source $HOME/.bashrc
conda config --set always_yes yes --set changeps1 no
conda config -f --add channels conda-forge
conda config -f --add channels defaults
conda update conda
conda update --all
# cleanup
rm ~/anaconda.sh
echo bootstrap_conda.sh completed. PATH now: $PATH
export PYSPARK_PYTHON="$HOME/anaconda/bin/python3.7"
# setup git
sudo yum install -y git
# install additional third party libraries
echo "Installing base packages"
conda install \
-c defaults \
-c conda-forge \
-c anaconda \
-y \
-q \
dask-yarn>=0.4.1 \
pyarrow \
s3fs \
nomkl \
conda-pack \
tensorflow \
grpcio \
protobuf \
yes | pip install fancyimpute
yes | pip install -U pandasql
if grep isMaster /mnt/var/lib/info/instance.json | grep true;
then
echo "Packaging environment"
conda pack -q -o $HOME/environment.tar.gz
echo "Configuring Dask"
mkdir -p $HOME/.config/dask
cat <<EOT >> $HOME/.config/dask/config.yaml
distributed:
dashboard:
link: "/proxy/{port}/status"
yarn:
environment: /home/hadoop/environment.tar.gz
worker:
env:
ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/
client:
env:
ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/
EOT
# Also set ARROW_LIBHDFS_DIR in ~/.bashrc so it's set for the local user
echo -e '\nexport ARROW_LIBHDFS_DIR=/usr/lib/hadoop/lib/native' >> $HOME/.bashrc
fi
I have followed your aws emr guide to set up my environment however there are many problems:
First start with a question, I am using anaconda instead of miniconda, and it gives 2GiB conda env file. Is It okay?
import dask
import dask.distributed
from dask_yarn import YarnCluster
from dask.distributed import Client
from sklearn.externals import joblib
cluster = YarnCluster(deploy_mode='local')
it gives me:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-9-8bae7fd69e7b> in <module>
1 # Create a cluster in local deploy mode, to have access to the dashboard
----> 2 cluster = YarnCluster(deploy_mode='local')
3 # cluster = YarnCluster.from_specification('/home/hadoop/.config/dask/config.yaml')
4 # cluster = YarnCluster(environment='/home/hadoop/environment.tar.gz', worker_vcores=4, worker_memory='4 GiB')
5
/home/hadoop/anaconda/lib/python3.7/site-packages/dask_yarn/core.py in __init__(self, environment, n_workers, worker_vcores, worker_memory, worker_restarts, worker_env, scheduler_vcores, scheduler_memory, deploy_mode, name, queue, tags, user, skein_client)
291 queue=queue,
292 tags=tags,
--> 293 user=user)
294
295 self._start_cluster(spec, skein_client)
/home/hadoop/anaconda/lib/python3.7/site-packages/dask_yarn/core.py in _make_specification(**kwargs)
116 "\n"
117 "See http://yarn.dask.org/environments.html for more information.")
--> 118 raise ValueError(msg)
119
120 n_workers = lookup(kwargs, 'n_workers', 'yarn.worker.count')
ValueError: You must provide a path to a Python environment for the workers.
This may be one of the following:
- A conda environment archived with conda-pack
- A virtual environment archived with venv-pack
- A path to a conda environment, specified as conda://...
- A path to a virtual environment, specified as venv://...
- A path to a python binary to use, specified as python://...
See http://yarn.dask.org/environments.html for more information.
After I tried to go with:
cluster = YarnCluster.from_specification('/home/hadoop/.config/dask/config.yaml')
gives me:
ValueError: Unknown extra keys for ApplicationSpec:
- distributed
- yarn
my config.yaml:
distributed:
dashboard:
link: "/proxy/{port}/status"
yarn:
environment: /home/hadoop/environment.tar.gz
worker:
env:
ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/
client:
env:
ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/
when I do :
cluster = YarnCluster(environment='/home/hadoop/environment.tar.gz', worker_vcores=4, worker_memory='4 GiB')
It just waiting, and running but no ends.
Eventually, I can do:
cluster = YarnCluster(environment='/home/hadoop/environment.tar.gz')
cluster.scale(10)
It works okay. But I cannot configure worker memory, vcore etc.
print(cluster.dashboard_link)
It gives me None
3) Even if I continue with these problems, when I run grid search like:
with joblib.parallel_backend('dask'):
# do grid search
When I connect worker nodes with ssh and check resource utilization, they are idle. Only the master node with 2 cores is executing.
First, thank you for this awesome library. It's been a game changer for me to be able to run dask clusters on EMR.
First, I'm using your EMR bootstrap script to set up the cluster. I have a bunch of large zip files (~2 GB) on s3 that I'm pulling smaller individual csv files out of and processing into a dask dataframe. I'm copying the data from s3 into hdfs before processing.
I can open a notebook as advertised and open a hdfs connection as follows with no problem.
import pyarrow as pa
fs = pa.hdfs.connect()
I'm able to do the same on the workers from within the notebook. So far this works great.
Where I run into a problem is that I want to run this as a script unattended as an EMR step instead of the notebook. However, when I do so it produces the following error on the same line
pyarrow.lib.ArrowIOError: HDFS connection failed
It seems like something is getting configured for the jupyter notebook server that isn't for the standalone job step, I'm just having a hard time pinning it down. Any ideas?
Hi guys,
I am sorry if it's a dummy/repeated question. We have been trying to follow the example to bring up dask on yarn and keep getting error "Kerberos ticket not found, please kinit and restart" even though the user starting the cluster does have a valid ticket.
Is there anywhere where I could specifically point to the ticket location at the runtime of the cluster? We have a hadoop cluster and wanted to use dask on yarn. Wondering if anybody has tried to work with this constellation. MapR hadoop cluster/ Dask on Yarn and could give us any pointers would be highly appreciated.
Thank you!
Andre
Error attached.
log-daskyarn.txt
Hi,
I am having issues with client.upload_file() when using a YarnCluster on a Spark EMR in AWS (one worker node). I've managed to find a workaround hack of using time.sleep() before creating a client from the cluster.
Without this sleep the upload_file() function appears to work but sys.path on the worker is not updated to include the zipfile I uploaded. This causes a ModuleNotFoundError exception in the function passed to client.submit().
I've created a minimal gist that shows this issue, at least on the EMR Spark cluster I'm using on AWS. I'm running this command on the master node using Python 3.6 (EMR 5.23.0): https://gist.github.com/crleblanc/ff4b7860a8a0401d283f7077c0d8953c. Setting WAIT_SECS to 0 causes this script to fail. Setting this to a large value like 10 seconds causes this example to pass as expected.
I have a bootstrap script that installs all of my dependencies but we're using upload_file() for code that changes often. This tends to change on every new instance of the cluster.
Hopefully this gist gives you an idea of where to investigate. Thanks for your work on such an excellent project!
I'm trying to test dask-yarn installation on my Hadoop cluster. I started with simple example:
from dask_yarn import YarnCluster
from dask.distributed import Client
cluster = YarnCluster(environment='environment.tar.gz', worker_vcores=1,
worker_memory='4GB',
n_workers=4)
the YARN application starts successfully, I have 1 scheduler and 4 workers. One thing is I have the following error in the scheduler's container log
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://zzz:34495
distributed.scheduler - INFO - bokeh at: :45834
distributed.scheduler - INFO - Register tcp://zzz:38718
distributed.scheduler - INFO - Starting worker compute stream, tcp://zzz:38718
distributed.core - INFO - Starting established connection
Future exception was never retrieved
future: <Future finished exception=KeyError('op',)>
Traceback (most recent call last):
File "/data/hadoop/yarn/local/usercache/user/appcache/application_1537451980488_0012/container_e19_1537451980488_0012_01_000002/environment/lib/python3.6/site-packages/tornado/gen.py", line 1147, in run
yielded = self.gen.send(value)
File "/data/hadoop/yarn/local/usercache/user/appcache/application_1537451980488_0012/container_e19_1537451980488_0012_01_000002/environment/lib/python3.6/site-packages/distributed/core.py", line 313, in handle_comm
op = msg.pop('op')
KeyError: 'op'
(zzz is just scheduler's host IP); and the output log in the python shell:
18/10/18 12:08:18 INFO skein.Daemon: Submitting application...
18/10/18 12:08:18 INFO impl.YarnClientImpl: Submitted application application_1537451980488_0012
18/10/18 12:08:36 INFO skein.Daemon: Starting process disconnected, shutting down
18/10/18 12:08:36 INFO skein.Daemon: Daemon shut down
Once I'm getting to the dask client:
client = Client(cluster)
I receive the stream closed error
/home/user/anaconda2/lib/python2.7/site-packages/distributed/comm/tcp.pyc in convert_stream_closed_error(obj, exc)
124 raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
125 else:
--> 126 raise CommClosedError("in %s: %s" % (obj, exc))
127
128
CommClosedError: in <closed TCP>: Stream is closed
Interestingly, that <Future finished exception=KeyError('op',)>
appears again in the scheduler log every time I'm trying to create the client.
The environment:
pip freeaze
):asn1crypto==0.24.0
bokeh==0.13.0
certifi==2018.10.15
cffi==1.11.5
chardet==3.0.4
Click==7.0
cloudpickle==0.6.1
conda-pack==0.2.0
cryptography==2.3.1
cryptography-vectors==2.3.1
cytoolz==0.9.0.1
dask==0.19.4
dask-yarn==0.3.1
distributed==1.23.3
future==0.16.0
grpcio==1.14.1
heapdict==1.0.0
idna==2.7
Jinja2==2.10
locket==0.2.0
MarkupSafe==1.0
mkl-fft==1.0.6
mkl-random==1.0.1
msgpack==0.5.6
numpy==1.15.0
packaging==18.0
pandas==0.23.4
partd==0.3.9
patsy==0.5.0
protobuf==3.6.1
psutil==5.4.7
pyarrow==0.11.0
pycparser==2.19
pyOpenSSL==18.0.0
pyparsing==2.2.2
PySocks==1.6.8
python-dateutil==2.7.3
pytz==2018.5
PyYAML==3.13
requests==2.19.1
scikit-learn==0.19.1
scipy==1.1.0
six==1.11.0
skein==0.2.0
sortedcontainers==2.0.5
statsmodels==0.9.0
tblib==1.3.2
toolz==0.9.0
tornado==5.1.1
tqdm==4.26.0
tsfresh==0.11.0
urllib3==1.23
zict==0.1.3
I've got a standard deployment of Dask 2.0 on AWS EMR Cluster with original bootstrap shell script.
I can connect to Jupyter Notebook and run the example code:
from dask_yarn import YarnCluster
from dask.distributed import Client
cluster = YarnCluster(environment='environment.tar.gz',
worker_vcores=2,
worker_memory="3GiB",
deploy_mode='local')
client = Client(cluster)
cluster
But when trying to scale up the cluster through widget I get an error:
tornado.application - ERROR - Exception in callback <function YarnCluster._widget..update at 0x7fc78597c378>
Traceback (most recent call last):
File "/home/hadoop/miniconda/lib/python3.6/site-packages/tornado/ioloop.py", line 907, in _run
return self.callback()
File "/home/hadoop/miniconda/lib/python3.6/site-packages/dask_yarn/core.py", line 649, in update
status.value = self._widget_status()
File "/home/hadoop/miniconda/lib/python3.6/site-packages/dask_yarn/core.py", line 585, in _widget_status
cores = sum(w['ncores'] for w in workers.values())
> File "/home/hadoop/miniconda/lib/python3.6/site-packages/dask_yarn/core.py", line 585, in
cores = sum(w['ncores'] for w in workers.values())
KeyError: 'ncores'
App log contains the following warning:
/mnt/yarn/usercache/hadoop/appcache/application_.../container_.../environment/lib/python3.6/site-packages/distributed/nanny.py:102: UserWarning: the ncores= parameter has moved to nthreads=
The problem disappeared after I changed the line 585 to:
cores = sum(w['nthreads'] for w in workers.values()) if workers.values() else 0
Environment details:
dask 2.0.0
dask-core 2.0.0
dask-yarn 0.6.1
notebook 5.7.8
tornado 6.0.3
distributed 2.0.1
bokeh 1.2.0
ipykernel 5.1.1
ipython 7.6.0
ipython_genutils 0.2.0
ipywidgets 7.4.2
I am trying to run my application using dask-yarn but while creating the yarn cluster I am getting an error-
Traceback (most recent call last):
File "", line 1, in
File "/home/user/project/sync/lib/python3.5/site-packages/dask_yarn/core.py", line 291, in init
self._start_cluster(spec, skein_client)
File "/home/user/project/sync/lib/python3.5/site-packages/dask_yarn/core.py", line 367, in _start_cluster
scheduler_address = app.kv.wait('dask.scheduler').decode()
File "/home/user/project/sync/lib/python3.5/site-packages/skein/kv.py", line 655, in wait
event = event_queue.get()
File "/home/user/project/sync/lib/python3.5/site-packages/skein/kv.py", line 281, in get
raise out
skein.exceptions.ConnectionError: Unable to connect to application
On checking the yarn logs I found that some of the worker nodes are throwing FileNotFoundError for skein.{crt,pem}-
LogContents:
/home/user/project/sync/lib/python3.5/site-packages/dask_yarn/config.py:13: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.
defaults = yaml.load(f)
Traceback (most recent call last):
File "/home/user/project/sync/bin/dask-yarn", line 10, in
sys.exit(main())
File "/home/user/projectn/sync/lib/python3.5/site-packages/dask_yarn/cli.py", line 407, in main
func(**kwargs)
File "/home/user/project/sync/lib/python3.5/site-packages/dask_yarn/cli.py", line 288, in scheduler
app_client = skein.ApplicationClient.from_current()
File "/home/user/project/sync/lib/python3.5/site-packages/skein/core.py", line 792, in from_current
security=Security.from_default())
File "/home/user/project/sync/lib/python3.5/site-packages/skein/model.py", line 344, in from_default
"Failed to resolve .skein.{crt,pem} in 'LOCAL_DIRS'")
FileNotFoundError: Failed to resolve .skein.{crt,pem} in 'LOCAL_DIRS'
Also sometimes my dask job works just fine and sometimes it throws this error. Is this a known issue or am I doing something wrong.
Hello,
By running the following code:
from dask_yarn import YarnCluster
from dask.distributed import Client
cluster = YarnCluster(environment='/home/rchaudro/pyspark2.tar.gz')
I have the following error:
---------------------------------------------------------------------------
OSError Traceback (most recent call last)
<ipython-input-2-16f7e6a860db> in <module>()
2 from dask.distributed import Client
3
----> 4 cluster = YarnCluster(environment='pyspark2.tar.gz')
5 # Connect to the cluster
6 client = Client(cluster)
/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/dask_yarn/core.pyc in __init__(self, environment, n_workers, worker_vcores, worker_memory, worker_restarts, worker_env, scheduler_vcores, scheduler_memory, deploy_mode, name, queue, tags, user, skein_client)
293 user=user)
294
--> 295 self._start_cluster(spec, skein_client)
296
297 @cached_property
/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/dask_yarn/core.pyc in _start_cluster(self, spec, skein_client)
337 "'dask.worker' service")
338
--> 339 skein_client = _get_skein_client(skein_client)
340
341 if 'dask.scheduler' not in spec.services:
/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/dask_yarn/core.pyc in _get_skein_client(skein_client, security)
44 with warnings.catch_warnings():
45 warnings.simplefilter('ignore')
---> 46 return skein.Client(security=security)
47 return skein_client
48
/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/skein/core.pyc in __init__(self, address, security, keytab, principal, log, log_level, java_options)
351 log=log,
352 log_level=log_level,
--> 353 java_options=java_options)
354 else:
355 proc = None
/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/skein/core.pyc in _start_driver(security, set_global, keytab, principal, log, log_level, java_options)
250 stderr=outfil,
251 env=env,
--> 252 **popen_kwargs)
253
254 while proc.poll() is None:
/usr/lib64/python2.7/subprocess.pyc in __init__(self, args, bufsize, executable, stdin, stdout, stderr, preexec_fn, close_fds, shell, cwd, env, universal_newlines, startupinfo, creationflags)
709 p2cread, p2cwrite,
710 c2pread, c2pwrite,
--> 711 errread, errwrite)
712 except Exception:
713 # Preserve original exception in case os.close raises.
/usr/lib64/python2.7/subprocess.pyc in _execute_child(self, args, executable, preexec_fn, close_fds, cwd, env, universal_newlines, startupinfo, creationflags, shell, to_close, p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite)
1325 raise
1326 child_exception = pickle.loads(data)
-> 1327 raise child_exception
1328
1329
OSError: [Errno 2] No such file or directory
I'm having problems getting dask groupby-apply working on EMR, I am seeing error messages in the worker logs which indicate a problem with deserialization the function being applied. I have a reproducable example that I have simplified as far as possible, which I am running on a small cluster (master: m4.large, workers: 2x m4.large)
I create the cluster using a modified version of the dask-yarn example bootstrap script. The script installs miniconda and other dependencies on the master node and all work nodes. It also creates a python source file (dask_test.py) containing a simple function on all nodes, and adds the file to the PYTHONPATH. This is my bootstrap script: bootstrap-dask-test.txt (I have changed the extension so I can upload it)
Running the following code using a LocalCluster in a notebook on the master works as expected:
import dask
from distributed import Client
from dask_test import test_func
client = Client(n_workers=2, threads_per_worker=1, memory_limit='3 GiB')
df = dask.datasets.timeseries().groupby(['name']).apply(test_func, meta={'elements': 'int'}).compute()
print(f'Result has {df.shape[0]} rows and {df.shape[1]} columns')
import dask
from distributed import Client
from dask_yarn import YarnCluster
from dask_test import test_func
env='python:///home/hadoop/miniconda/bin/python3.6'
cluster = YarnCluster(environment=env, deploy_mode='local', n_workers=4, worker_vcores=1, worker_memory='3 GiB')
client = Client(cluster)
df = dask.datasets.timeseries().groupby(['name']).apply(test_func, meta={'elements': 'int'}).compute()
print(f'Result has {df.shape[0]} rows and {df.shape[1]} columns')
This is the worker log containing the serialization error message:
dask.worker.log
I have also tried using a packaged conda environment, and using Client.upload_file, with no luck. Any advice would be greatly appreciated.
Is there support for Kerberos enabled clusters.?
Sorry for the bare bones issue, I'd be happy to flesh out more but I'm not sure where to start providing data. I also suspect this might be a skein bug, but I'm not sure - happy to point it in their direction if that is the case
When creating a YarnCluster and specifying worker_vcores, the scheduler starts up fine, but no workers instantiate (either via a scale or by specifying initial n_workers). The application logs look like there's a mismatch between what containers are being requested and what skein wants, which points to a problem on there end, but I'm not too sure and figured I'd start here. Any ideas for additional things to look for?
Specifying worker_memory works fine, but will always end up having only 1 core per container regardless of how much memory you give it, so you end up with situations where there's 10 containers with 100gb of ram each only using 1 core. Pains me inside :)
Thanks,
Michael
19/01/15 19:04:25 INFO skein.ApplicationMaster: Application specification successfully loaded
19/01/15 19:04:25 INFO skein.ApplicationMaster: Running as user hadoop
19/01/15 19:04:25 INFO client.RMProxy: Connecting to ResourceManager at ip-10-201-102-148.ginkgobioworks.com/10.201.102.148:8030
19/01/15 19:04:25 INFO impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
19/01/15 19:04:25 INFO skein.ApplicationMaster: gRPC server started at ip-10-201-102-157.ginkgobioworks.com:42873
19/01/15 19:04:25 INFO skein.ApplicationMaster: WebUI server started at ip-10-201-102-157.ginkgobioworks.com:32841
19/01/15 19:04:25 INFO client.RMProxy: Connecting to ResourceManager at ip-10-201-102-148.ginkgobioworks.com/10.201.102.148:8032
19/01/15 19:04:25 INFO skein.ApplicationMaster: Initializing service 'dask.worker'.
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_2
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_3
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_4
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_5
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_6
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_7
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_8
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_9
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_10
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_11
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_12
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_13
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_14
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_15
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_16
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_17
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_18
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_19
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_20
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_21
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_22
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_23
19/01/15 19:04:25 INFO skein.ApplicationMaster: REQUESTED: dask.worker_24
19/01/15 19:04:27 INFO impl.AMRMClientImpl: Received new token for : ip-10-201-102-157.ginkgobioworks.com:8041
19/01/15 19:04:27 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 1, releasing container_1547573448325_0009_01_000002
19/01/15 19:04:28 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 2, releasing container_1547573448325_0009_01_000003
19/01/15 19:04:29 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 3, releasing container_1547573448325_0009_01_000004
19/01/15 19:04:30 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 4, releasing container_1547573448325_0009_01_000005
19/01/15 19:04:31 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 5, releasing container_1547573448325_0009_01_000006
19/01/15 19:04:32 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 6, releasing container_1547573448325_0009_01_000007
19/01/15 19:04:33 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 7, releasing container_1547573448325_0009_01_000008
19/01/15 19:04:34 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 8, releasing container_1547573448325_0009_01_000009
19/01/15 19:04:35 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 9, releasing container_1547573448325_0009_01_000010
19/01/15 19:04:36 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 10, releasing container_1547573448325_0009_01_000011
19/01/15 19:04:37 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 11, releasing container_1547573448325_0009_01_000012
19/01/15 19:04:38 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 12, releasing container_1547573448325_0009_01_000013
19/01/15 19:04:39 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 13, releasing container_1547573448325_0009_01_000014
19/01/15 19:04:40 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 14, releasing container_1547573448325_0009_01_000015
19/01/15 19:04:41 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 15, releasing container_1547573448325_0009_01_000016
19/01/15 19:04:42 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 16, releasing container_1547573448325_0009_01_000017
19/01/15 19:04:43 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 17, releasing container_1547573448325_0009_01_000018
19/01/15 19:04:44 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 18, releasing container_1547573448325_0009_01_000019
19/01/15 19:04:45 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 19, releasing container_1547573448325_0009_01_000020
19/01/15 19:04:46 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 20, releasing container_1547573448325_0009_01_000021
19/01/15 19:04:47 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 21, releasing container_1547573448325_0009_01_000022
19/01/15 19:04:48 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 22, releasing container_1547573448325_0009_01_000023
19/01/15 19:04:49 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 23, releasing container_1547573448325_0009_01_000024
19/01/15 19:04:50 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 24, releasing container_1547573448325_0009_01_000025
19/01/15 19:04:51 WARN skein.ApplicationMaster: No matching service found for resource <memory:2048, vCores:1>, priority 25, releasing container_1547573448325_0009_01_000026
As far as I can tell the Dask workers and scheduler on Yarn nodes only look at /etc/dask/ for configuration. It would be nice if also files in ~/.config/dask/ are used. A better option would be to allow uploading the local Dask configuration to HDFS so no Dask config is needed on the Yarn cluster nodes.
I am trying to achieve user separation and that seems only possible by specifying a different CA for each user. I have created a feature request this purpose dask/distributed#2347
Is it possible to specify which node should be used for the yarn-scheduler? That would be really handy to integrate it into Jupyter using the nbproxy, since I'm not able to reach the port directly
I am trying to access the Dask Bokeh dashboard on an AWS EMR cluster via Chrome, but nothing is shown when I click on the linked dashboard.
I set-up the cluster using this workflow. When I click on the linked dashboard, I see nothing unless I click on "Info" where I can only find information on the workers.
I have tried using ! pip install tornado==5
from within Jupyter Notebook/Hub with no resolution.
What am I missing to see the entire Dask Bokeh dashboard?
Note: This is a requested x-post from StackOverflow 1
Some options on the distributed CLI should be settable via dask-yarn (e.g. --dashboard-address
). There are a few ways we could accomplish this:
env
(or via configuration #42).From jcrist/skein#176.
Would support creating a cluster object from an existing application. The process would be:
skein.Client.connect
dask.worker
(application is indeed a dask cluster)YarnCluster
object using the existing connection logicDo we have a pragmatic recommendation today for users that don't have direct access to an edge node?
Same as conda-pack
, but for virtual environments: https://jcrist.github.io/venv-pack/. I've tested, and it works as a drop-in for deploying dask - can specify the path to the packaged virtualenv just as you would for a conda environment.
Hi everyone, I'm trying to execute a small piece of code on Yarn, there is my whole code:
`
with YarnCluster(environment='environment.tar.gz',
worker_vcores=2,
worker_memory="8GiB") as cluster:
cluster.scale(2)
# Connect to the cluster
client = Client(cluster)
CSV_INPUT = 'hdfs:///Data/input/input.csv'
CSV_OUTPUT = 'hdfs://Data/input/output.parquet'
df = dd.read_csv(CSV_INPUT)
dd.to_parquet(df, CSV_OUTPUT)
`
unfortunely i have this error
`
distributed.scheduler.KilledWorker: ("('pandas_read_text-read-block-from-delayed-3f488119df76d5b2ba0e2e75ec2bc55b', 0)", <Worker 'tcp://myip:40417', memory: 0, processing: 1>)
`
the error is not understandable for me, have any one experienced this issue?
thanks
Hey,
I tried to make dask-yarn work with joblib's parallel loops, but failed because there seems to be a problem with the YarnCluster reporting its cores.
Here a small example:
cluster = YarnCluster(...)
# Connect to the cluster
client = Client(cluster)
# This should give a list of available cores, but is empty
print(client.ncores())
with parallel_backend('dask'):
# This is empty since no workers are assigned
results = Parallel(verbose=100)(delayed(lambda x: x**2)(x) for x in range(10))
However, I found that doesn't seem to be a problem when using a map-gather construct to achieve the same.
# This works fine
results = client.gather(client.map(lambda x: x**2, range(10)))
From #49 (comment).
I think this would look like:
worker_files
kwarg, which takes a mapping to forward to the files
kwarg in the skein specification.worker-file
and worker-archive
option in the cli. We could mirror spark's api here and have the localized name be optionally specified with a #
separator (see https://spark.apache.org/docs/latest/running-on-yarn.html#important-notes). I'm not a huge fan of this api, but it is used by other tools and that might be argument enough to use it.dask-yarn submit \
--worker-file /local/file/path \
--worker-file /other/local/file/path#name-in-container
spec = _make_specification(environment=environment,
n_workers=n_workers,
worker_vcores=worker_vcores,
worker_memory=worker_memory,
worker_max_restarts=worker_max_restarts,
scheduler_vcores=scheduler_vcores,
scheduler_memory=scheduler_memory,
name=name,
queue=queue,
tags=tags)
Yet _make_specification is looking for
worker_restarts = either('worker_restarts', 'yarn.worker.restarts')
I am trying to sort out how the Skin YAML configuration (described here https://jcrist.github.io/skein/specification.html) matches with the Dask-Yarn YAML configuration (described here http://yarn.dask.org/en/latest/configuration.html).
Specifically I see the Skin has a files
configuration to let you distribute files to the workers. It says to place this files
section under the master
or service
section of the configuration. For the dask-yarn configuration there's only scheduler
and worker
sections shown in the example. Are the dask-yarn scheduler
and worker
being implicitly placed into a services
section of a Skein configuration?
How does the dask-yarn YAML file match up to the Skein YAML file and where would I put the file
attributes in my dask-yarn config file?
Thanks!
Not sure this is dask-yarn's fault so feel free to close, but wanted to share this fail when trying installation:
$ conda install dask-yarn -c conda-forge
PackageNotFoundError: Package not found: '' Dependencies missing in current linux-64 channels:
- dask-yarn -> grpcio >=1.14.0 -> c-ares >=1.15.0,<2.0a0 -> libgcc-ng >=7.3.0
- dask-yarn -> grpcio >=1.14.0 -> libstdcxx-ng >=7.3.0
This would probably look like
$ dask-yarn submit myscript.py [options...]
It would submit a YARN job with an additional service running the user's script. This would allow dask-yarn
to submit scripts to run fully on the cluster, mirroring spark-submit
in cluster
mode. The command could either exit immediately (leaving tracking of application success/failure to other tools), or log to the console.
I'm running the following, very simple example, with dask-yarn.
cluster = YarnCluster(environment='venv:///home/florisvannee/my_envsd',
worker_vcores=1,
worker_memory="256MiB")
cluster.scale(1)
client = Client(cluster)
def inc(x):
return x + 1
a = client.submit(inc, 20)
print(a)
print(a.result())
This never completes. If I change the call to scale
with any number higher than 1, it does complete. Eg. with cluster.scale(2)
I get the following output in the log:
19/04/19 14:07:05 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0
19/04/19 14:07:06 INFO skein.ApplicationMaster: Starting container_1555675382310_0003_01_000002...
19/04/19 14:07:06 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_1555675382310_0003_01_000002
19/04/19 14:07:07 INFO skein.ApplicationMaster: Scaling service 'dask.worker' to 2 instances, a delta of 2.
19/04/19 14:07:07 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0
19/04/19 14:07:07 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1
19/04/19 14:07:12 INFO skein.ApplicationMaster: Starting container_1555675382310_0003_01_000003...
19/04/19 14:07:12 INFO skein.ApplicationMaster: RUNNING: dask.worker_1 on container_1555675382310_0003_01_000003
Even though it looks like it is requesting worker_0, it never gets it. When I look in the overall resourcemanager log, I never see any request for worker_0 though. I only see requests for worker 1. Note that this is not, because there is in this example no work (for more complex examples with more load I see exactly the same behavior, the first worker never gets properly requested). Also, when running with cluster.scale(1)
, it never completes because that first worker is just never started up.
Running scale()
with any number higher than 2 always results in all workers being started except worker_0.
Any ideas what could be causing this?
Potentially naive question, as I just learned what YARN was at a meetup last night. I think Amazon's EMR service is built around it. With that in mind, could you use this package, or parts of it, to get a dask cluster up and running on EMR?
I know the recommended deployment is using kubernetes, but my company blocked AWS' kubernetes service (EKS) 🤦♂️.
Any tips/advice would be greatly appreciated.
Hello this is a copy from: jcrist/skein#181
I am new to skein and dask in general. I am trying to create a hello world dask-yarn application and running into ssl certificate errors. Does anyone know how to get around it?
Here's the code i am using:
import skein
c = skein.Client(log_level='debug', java_options="-Djava.io.tmpdir=/path/to/some/dir -Dcom.sun.net.ssl.checkRevocation=false")
and here's the error message:
19/07/08 09:25:06 DEBUG skein.Driver: Starting Skein version 0.7.4
19/07/08 09:25:06 DEBUG skein.Driver: Logging in using ticket cache
19/07/08 09:25:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/07/08 09:25:07 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
19/07/08 09:25:08 INFO client.AHSProxy: Connecting to Application History server at cap-compute02/172.27.57.54:10200
19/07/08 09:25:08 INFO skein.Driver: Driver started, listening on 37170
19/07/08 09:25:08 DEBUG skein.Driver: Reporting gRPC server port back to the launching process
E0708 09:25:08.618234364 161031 ssl_transport_security.cc:1238] Handshake failed with fatal error SSL_ERROR_SSL: error:1000007d:SSL routines:OPENSSL_internal:CERTIFICATE_VERIFY_FAILED.
19/07/08 09:25:08 DEBUG skein.Driver: Starting process disconnected, shutting down
19/07/08 09:25:08 DEBUG skein.Driver: Driver shut down
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/software/virtual_env/aesop_20190707_env/lib/python3.7/site-packages/skein/core.py", line 366, in __init__
self._call('ping', proto.Empty())
File "/software/virtual_env/aesop_20190707_env/lib/python3.7/site-packages/skein/core.py", line 289, in _call
raise ConnectionError("Unable to connect to %s" % self._server_name)
skein.exceptions.ConnectionError: Unable to connect to driver
Hi guys,
i am facing this error. What am i missing?
ConnectionError: Unable to connect to application
ConnectionError Traceback (most recent call last)
in engine
----> 1 mycluster=YarnCluster(environment='compre3.tar.gz')
/home/cdsw/.local/lib/python3.6/site-packages/dask_yarn/core.py in init(self, environment, n_workers, worker_vcores, worker_memory, worker_restarts, worker_env, scheduler_vcores, scheduler_memory, deploy_mode, name, queue, tags, user, skein_client)
293 user=user)
294
--> 295 self._start_cluster(spec, skein_client)
296
297 @cached_property
/home/cdsw/.local/lib/python3.6/site-packages/dask_yarn/core.py in _start_cluster(self, spec, skein_client)
373 app = skein_client.submit_and_connect(spec)
374 try:
--> 375 scheduler_address = app.kv.wait('dask.scheduler').decode()
376 dashboard_address = app.kv.get('dask.dashboard')
377 if dashboard_address is not None:
/home/cdsw/.local/lib/python3.6/site-packages/skein/kv.py in wait(self, key, return_owner)
653 return res
654
--> 655 event = event_queue.get()
656
657 return event.result if return_owner else event.result.value
/home/cdsw/.local/lib/python3.6/site-packages/skein/kv.py in get(self, block, timeout)
279 if isinstance(out, Exception):
280 self._exception = out
--> 281 raise out
282 return out
283
ConnectionError: Unable to connect to application
The dask dashboard relies on modern web features like websockets, which are incompatible with the yarn proxy. To get around this we can either:
How do we feel about the following configuration file:
yarn:
skein-specification: null # A full specification or path to a specification
# Overwrites the following configuraiton if given
name: dask # Application name
queue: default # Yarn queue to deploy to
environment: null # Path to conda packed environment
tags: [] # List of strings to tag applications
scheduler: # Specifications of scheduler container
cores: 1
memory: 2GiB
worker: # Specifications of worker containers
cores: 1
memory: 2GiB
instances: 0
restarts: -1 # Allowed number of restarts, -1 for unlimited
A couple open questions:
cores
vs vcores
and 2GiB
vs 2048
So how do we handle adaptive loads? There are a few actors here living in different places here:
So it's convenient if the Adaptive and Scheduler objects are in the same thread. The Dask Client and the Scheduler are likely on different machines. Should the Skein Client be with the Dask Client or the Scheduler?
If the Skein Cleint should be with the Dask Client then we should probably make a RemoteCluster object that lives with the Scheduler, but just streams back signals to us.
class RemoteCluster(Cluster):
def __init__(self, scheduler, comm):
self.scheduler = scheduler
self.comm = comm
def scale_up(self, n):
self.comm.send({'op': 'scale-up', 'n': n})
def scale_down(self, workers):
self.comm.send({'op': 'scale-down', 'workers': workers})
We would then, from the Dask client ask the scheduler to set up an RemoteCluster, set up a local Adaptive object that uses it, and have that pair send us the information to pass onto the Skein Client.
Alternativley we could add the Skein Client to the process running the Scheduler, empowering it to start and stop its own workers. It would start up its own YarnCluster object, presumably providing it its own application id. It would then receive infrequent messages from the client to change its adaptive settings (this infrastructure is already built).
This requires less work on our part, but does mean that a Yarn container gets elevated access about its own application. I'm not sure if this is common or not.
dask-yarn doesn't seem to be compatible with distributed 2.2.0.
The reason seems to be this change:
which then breaks this piece of code here:
https://github.com/dask/dask-yarn/blob/master/dask_yarn/cli.py#L316
so that whenever scheduler is attempted to be started it crashes with:
TypeError: start() takes exactly 1 positional argument (2 given)
Sorry cannot give full call stack right now, but can provide, if needed. Anyway, the problem is clear I think.
I have this minimal yarn.yaml
file. I'm using dask-yarn 0.5.0
and Skein 0.5.1
. I'm running on an EMR cluster.
yarn:
name: dask # Application name
queue: default # Yarn queue to deploy to
environment: python:///usr/bin/python
tags: [] # List of strings to tag applications
specification:
name: dask
queue: default
services:
dask.scheduler:
resources:
memory: 4GiB
vcores: 1
script: |
# Script to start the scheduler. Alternatively dask-yarn scheduler might work if it's on your PATH
dask-yarn services scheduler
dask.worker:
instances: 1
resources:
memory: 8GiB
vcores: 1
env:
'AAAAAAA': aaaaaaaaaaaaaaaa
script: |
# Script to start the scheduler. Alternatively dask-yarn scheduler might work if it's on your PATH
dask-yarn services worker
I am trying to set environment variables in my containers. I have been able to successfully do this using the YarnCluster(worker_env=WORKER_ENV)
syntax, however I want to be able to use a yarn.yaml
files so I can change the cluster configuration during testing without having to change my code, and including any kwargs in the constructor prevents this file being used. I'm running this basic program and finding that the environment variables are not being set as I expect.
import os
from dask_yarn import YarnCluster
from dask.distributed import Client
def get_env(varname):
import os
return os.environ.get(varname)
cluster = YarnCluster()
client = Client(cluster)
a = client.submit(get_env, 'AAAAAAA')
print(a.result())
This returns None
since the variable isn't being set.
The Hadoop cluster I use just added additional security with two KMS servers to encrypt the data. From the logs it looks like the KMS is unhappy with my kerberos credentials. I do have a valid kerberos ticket (the job would not have launched otherwise but I double checked).
I am getting a new error while reading from parquet:
19/03/01 12:16:45 INFO skein.Driver: Driver started, listening on 46336
19/03/01 12:16:46 INFO hdfs.DFSClient: Created token for jlord: HDFS_DELEGATION_TOKEN [email protected], renewer=yarn, realUser=, issueDate=1551464206344, maxDate=1552069006344, sequenceNumber=9382869, masterKeyId=510 on ha-hdfs:nameservice1
19/03/01 12:16:47 INFO security.TokenCache: Got dt for hdfs://nameservice1; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:nameservice1, Ident: (token for jlord: HDFS_DELEGATION_TOKEN [email protected], renewer=yarn, realUser=, issueDate=1551464206344, maxDate=1552069006344, sequenceNumber=9382869, masterKeyId=510)
19/03/01 12:16:47 INFO security.TokenCache: Got dt for hdfs://nameservice1; Kind: kms-dt, Service: 10.195.102.124:16000, Ident: (kms-dt owner=jlord, renewer=yarn, realUser=, issueDate=1551464207669, maxDate=1552069007669, sequenceNumber=115244, masterKeyId=35)
19/03/01 12:16:47 INFO skein.Driver: Uploading application resources to hdfs://nameservice1/user/jlord/.skein/application_1551067831607_22507
19/03/01 12:17:03 INFO skein.Driver: Submitting application...
19/03/01 12:17:03 INFO impl.YarnClientImpl: Submitted application application_1551067831607_22507
19/03/01 12:17:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Traceback (most recent call last):
File "read_parquet.py", line 27, in <module>
df = dataframe.read_parquet('hdfs:///user/jlord/test_lots_of_parquet')
File "/nas/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask/dataframe/io/parquet.py", line 1155, in read_parquet
categories=categories, index=index, infer_divisions=infer_divisions)
File "/nas/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask/dataframe/io/parquet.py", line 674, in _read_pyarrow
filters=filters)
File "/nas/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/pyarrow/parquet.py", line 890, in __init__
path_or_paths, self.fs, metadata_nthreads=metadata_nthreads)
File "/nas/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/pyarrow/parquet.py", line 1065, in _make_manifest
.format(path))
OSError: Passed non-file path: /user/jlord/test_lots_of_parquet
19/03/01 12:17:54 INFO skein.Driver: Driver shut down
The logs:
Container: container_e10_1551067831607_22507_01_000001 on all1.allstate.com_8041
=====================================================================================
LogType:application.master.log
Log Upload Time:Fri Mar 01 12:17:55 -0600 2019
LogLength:3334
Log Contents:
19/03/01 12:17:09 INFO skein.ApplicationMaster: Running as user jlord
19/03/01 12:17:09 INFO skein.ApplicationMaster: Application specification successfully loaded
19/03/01 12:17:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/03/01 12:17:11 INFO skein.ApplicationMaster: gRPC server started at all1.allstate.com:41971
19/03/01 12:17:11 INFO skein.ApplicationMaster: WebUI server started at all1.allstate.com:34551
19/03/01 12:17:11 INFO skein.ApplicationMaster: Registering application with resource manager
19/03/01 12:17:12 INFO skein.ApplicationMaster: Initializing service 'dask.worker'.
19/03/01 12:17:12 INFO skein.ApplicationMaster: Initializing service 'dask.scheduler'.
19/03/01 12:17:12 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0
19/03/01 12:17:13 INFO skein.ApplicationMaster: Starting container_e10_1551067831607_22507_01_000002...
19/03/01 12:17:13 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_e10_1551067831607_22507_01_000002
19/03/01 12:17:52 INFO skein.ApplicationMaster: Scaling service 'dask.worker' to 10 instances, a delta of 10.
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_2
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_3
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_4
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_5
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_6
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_7
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_8
19/03/01 12:17:52 INFO skein.ApplicationMaster: REQUESTED: dask.worker_9
19/03/01 12:17:54 INFO skein.ApplicationMaster: Shutting down: Shutdown requested by user.
19/03/01 12:17:54 INFO skein.ApplicationMaster: Unregistering application with status SUCCEEDED
19/03/01 12:17:54 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
19/03/01 12:17:54 WARN security.UserGroupInformation: PriviledgedActionException as:jlord (auth:KERBEROS) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
19/03/01 12:17:54 WARN ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
19/03/01 12:17:54 WARN security.UserGroupInformation: PriviledgedActionException as:jlord (auth:KERBEROS) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
19/03/01 12:17:54 INFO skein.ApplicationMaster: Deleted application directory hdfs://nameservice1/user/jlord/.skein/application_1551067831607_22507
19/03/01 12:17:54 INFO skein.ApplicationMaster: WebUI server shut down
19/03/01 12:17:54 INFO skein.ApplicationMaster: gRPC server shut down
LogType:container-localizer-syslog
Log Upload Time:Fri Mar 01 12:17:55 -0600 2019
LogLength:2442
Log Contents:
2019-03-01 12:17:06,120 WARN [ContainerLocalizer Downloader] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:jlord (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
2019-03-01 12:17:06,122 WARN [ContainerLocalizer Downloader] org.apache.hadoop.ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
2019-03-01 12:17:06,122 WARN [ContainerLocalizer Downloader] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:jlord (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
2019-03-01 12:17:07,687 WARN [ContainerLocalizer Downloader] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:jlord (auth:KERBEROS) cause:org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
2019-03-01 12:17:07,688 WARN [ContainerLocalizer Downloader] org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider: KMS provider at [https://all2.allstate.com:16000/kms/v1/] threw an IOException [org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]!!
2019-03-01 12:17:08,483 WARN [ContainerLocalizer Downloader] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:jlord (auth:KERBEROS) cause:org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
2019-03-01 12:17:08,485 WARN [ContainerLocalizer Downloader] org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider: KMS provider at [https://all2.allstate.com:16000/kms/v1/] threw an IOException [org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]!!
Container: container_e10_1551067831607_22507_01_000002 on all1753.allstate.com_8041
=====================================================================================
LogType:container-localizer-syslog
Log Upload Time:Fri Mar 01 12:17:55 -0600 2019
LogLength:2442
Log Contents:
2019-03-01 12:17:17,341 WARN [ContainerLocalizer Downloader] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:jlord (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
2019-03-01 12:17:17,347 WARN [ContainerLocalizer Downloader] org.apache.hadoop.ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
2019-03-01 12:17:17,348 WARN [ContainerLocalizer Downloader] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:jlord (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
2019-03-01 12:17:19,085 WARN [ContainerLocalizer Downloader] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:jlord (auth:KERBEROS) cause:org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
2019-03-01 12:17:19,088 WARN [ContainerLocalizer Downloader] org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider: KMS provider at [https://all2.allstate.com:16000/kms/v1/] threw an IOException [org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]!!
2019-03-01 12:17:51,257 WARN [ContainerLocalizer Downloader] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:jlord (auth:KERBEROS) cause:org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
2019-03-01 12:17:51,257 WARN [ContainerLocalizer Downloader] org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider: KMS provider at [https://all2.allstate.com:16000/kms/v1/] threw an IOException [org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]!!
LogType:dask.scheduler.log
Log Upload Time:Fri Mar 01 12:17:55 -0600 2019
LogLength:633
Log Contents:
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://10.195.102.137:38407
distributed.scheduler - INFO - bokeh at: :36105
distributed.scheduler - INFO - Receive client connection: Client-543b1970-3c4e-11e9-9f37-246e9682cec8
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-543b1970-3c4e-11e9-9f37-246e9682cec8
distributed.scheduler - INFO - Remove client Client-543b1970-3c4e-11e9-9f37-246e9682cec8
distributed.scheduler - INFO - Close client connection: Client-543b1970-3c4e-11e9-9f37-246e9682cec8
In [13]: cluster.application_client.containers()
Out[13]: [Container<service_name='dask.scheduler', instance=0, state=RUNNING>]
In [14]: cluster
Out[14]: YarnCluster<"pending connection">
In [15]: cluster.application_client.kv
Out[15]: <skein.core.KeyValueStore at 0x7fe67d797518>
In [16]: dict(cluster.application_client.kv)
Out[16]:
{'dask.scheduler': 'tcp://172.22.0.3:37279',
'dask.dashboard': 'http://172.22.0.3:46003'}
In [17]: from dask.distributed import Client
In [18]: client = Client(cluster)
18/07/01 13:04:50 INFO skein.Daemon: Starting process disconnected, shutting down
18/07/01 13:04:50 INFO skein.Daemon: Shutting down gRPC server
18/07/01 13:04:50 INFO skein.Daemon: gRPC server shut down
In [19]: client
Out[19]: <Client: scheduler='tcp://172.22.0.3:37279' processes=0 cores=0>
In [20]: cluster
Out[20]: YarnCluster<'tcp://172.22.0.3:37279'>
dask-yarn currently sets the DASK_CONFIG
directory to the fixed value .config
in the service setup, but that directory might not always be writable. Would be nice to make that overridable.
I'm actually not so sure why that directory needs to be configured at all. Are users expected to provide persistent configuration there across container runs? If not, then why not set it to something like /tmp
?
I am working on using dask yarn on EMR using ephemeral clusters. As such, I would like to use the cluster resources at their maximum.
However, I am struggling to get dask to use more than 50 % of the cores. I understand that like with any yarn app, I can specify the resources my app wants. However this logic doesnt really make sense in the context of single app, ephemeral tasks. Since there is only one app running on a cluster designed for it, it should use all of the available resources.
Is there any way to create a yarn cluster such that it uses all of the available resources? I apologize if this is somewhat documented but havent been able to find it in the docs.
Hi
I'm running with the following versions
dask_yarn = 0.3.1
dask = 0.19.3
distributed = 1.23.3
skein = 0.1.1
I'm successfully running the following code:
from dask_yarn import YarnCluster
from dask.distributed import Client
cluster = YarnCluster(environment = 'testDask.tar.gz', worker_vcores=4, worker_memory='4GB', n_workers=16, name='task1')
client = Client(cluster)
client.get_versions(check=True)
{'scheduler': {'host': (('python', '3.6.6.final.0'),
('python-bits', 64),
('OS', 'Linux'),
('OS-release', '4.4.73-5-default'),
('machine', 'x86_64'),
('processor', 'x86_64'),
('byteorder', 'little'),
('LC_ALL', 'POSIX'),
('LANG', 'en_US.UTF-8'),
('LOCALE', 'None.None')),
'packages': {'required': (('dask', '0.19.3'),
('distributed', '1.23.3'),
('msgpack', '0.5.6'),
('cloudpickle', '0.5.3'),
('tornado', '5.1'),
('toolz', '0.9.0')),
'optional': (('numpy', '1.15.0'),
('pandas', '0.23.4'),
('bokeh', '0.13.0'),
('lz4', None),
('dask_ml', None),
('blosc', None))}},
'workers': {},
'client': {'host': [('python', '3.6.5.final.0'),
('python-bits', 64),
('OS', 'Linux'),
('OS-release', '4.4.73-5-default'),
('machine', 'x86_64'),
('processor', 'x86_64'),
('byteorder', 'little'),
('LC_ALL', 'None'),
('LANG', 'en_US.UTF-8'),
('LOCALE', 'en_US.UTF-8')],
'packages': {'required': [('dask', '0.19.3'),
('distributed', '1.23.3'),
('msgpack', '0.5.6'),
('cloudpickle', '0.5.3'),
('tornado', '5.1'),
('toolz', '0.9.0')],
'optional': [('numpy', '1.15.0'),
('pandas', '0.23.4'),
('bokeh', '0.13.0'),
('lz4', None),
('dask_ml', None),
('blosc', None)]}}}
but when running:
cluster.scale(8)
I'm getting the following error
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
<ipython-input-11-6e6c3e46d9a8> in <module>()
----> 1 cluster.scale(8)
/opt/anaconda3/lib/python3.6/site-packages/dask_yarn/core.py in scale(self, n)
315
316 if n_to_delete:
--> 317 to_close = self._select_workers_to_close(n_to_delete)
318 self.scale_down(to_close)
319
/opt/anaconda3/lib/python3.6/site-packages/dask_yarn/core.py in _select_workers_to_close(self, n)
285 worker_info = client.scheduler_info()['workers']
286 # Sort workers by memory used
--> 287 workers = sorted((v['memory'], k) for k, v in worker_info.items())
288 # Return just the ips
289 return [w[1] for w in workers[:n]]
/opt/anaconda3/lib/python3.6/site-packages/dask_yarn/core.py in <genexpr>(.0)
285 worker_info = client.scheduler_info()['workers']
286 # Sort workers by memory used
--> 287 workers = sorted((v['memory'], k) for k, v in worker_info.items())
288 # Return just the ips
289 return [w[1] for w in workers[:n]]
KeyError: 'memory'
any ideas?
Should we be willing to package up the user's current environment with conda pack if they don't specify anything?
I plan to issue a new release in the next week, which will be the last release to support Python 2.7. Future releases will be Python 3 only. This will ease the maintenance burden, simplify our code, and allow us to use more modern concurrency features.
See also jcrist/skein#180.
18/09/25 15:24:57 INFO skein.Daemon: Removing callbacks for application_1537170416259_7296
18/09/25 15:25:36 INFO impl.YarnClientImpl: Killed application application_1537170416259_7296
Traceback (most recent call last):
File "main.py", line 10, in <module>
cluster = YarnCluster()
File "/analytics/anaconda3/envs/myenv/lib/python3.6/site-packages/dask_yarn/core.py", line 179, in __init__
self._start_cluster(spec, skein_client)
File "/analytics/anaconda3/envs/myenv/lib/python3.6/site-packages/dask_yarn/core.py", line 204, in _start_cluster
scheduler_address = app.kv.wait('dask.scheduler').decode()
File "/analytics/anaconda3/envs/myenv/lib/python3.6/site-packages/skein/kv.py", line 655, in wait
event = event_queue.get()
File "/analytics/anaconda3/envs/myenv/lib/python3.6/site-packages/skein/kv.py", line 281, in get
raise out
skein.exceptions.ConnectionError: Unable to connect to application
18/09/25 15:25:36 INFO skein.Daemon: Starting process disconnected, shutting down
18/09/25 15:25:36 INFO skein.Daemon: Shutting down gRPC server
18/09/25 15:25:36 INFO skein.Daemon: gRPC server shut down`
my main.py
from dask_yarn import YarnCluster
from dask.distributed import Client
import dask.dataframe as dd
import dask
print('dask.config.config =>', dask.config.config)
cluster = YarnCluster()
client = Client(cluster)
version = client.get_versions(check=True)
print('version', version)
df = dd.read_csv('hdfs:///data/my.csv')
print(df)
`
the coda.yml
---
name: myenv
channels:
- defaults
- conda-forge
dependencies:
- conda-pack
- python=3.6
- dask
- dask-yarn
- gcsfs
- pyarrow
- hdfs3
- libhdfs3
the config/dask/yarn.yaml
`yarn:
# specification: null # A full Skein specification or path to a
# # specification yaml file. Overrides the following
# # configuration if given
name: dask # Application name
queue: defult # Yarn queue to deploy to
environment: environment.tar.gz # Path to conda packed environment
# tags: [] # List of strings to tag applications
scheduler: # Specifications of scheduler container
vcores: 1
memory: 2GiB
worker: # Specifications of worker containers
vcores: 1
memory: 2GiB
count: 2 # Number of workers to start on initialization
restarts: -1 # Allowed number of restarts, -1 for unlimited
env: {} # A map of environment variables to set on the worker`
the only logs that I have are poor
18/09/25 15:25:13 INFO skein.ApplicationMaster: INTIALIZING: dask.worker:
18/09/25 15:25:13 INFO skein.ApplicationMaster: WAITING: dask.worker_0:
18/09/25 15:25:13 INFO skein.ApplicationMaster: WAITING: dask.worker_1:
18/09/25 15:25:13 INFO skein.ApplicationMaster: INTIALIZING: dask.scheduler:
18/09/25 15:25:13 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0:
18/09/25 15:25:14 INFO skein.ApplicationMaster: watch called:
18/09/25 15:25:14 INFO skein.ApplicationMaster: Created Watcher. id=0, start=dask.scheduler, end=dask.scheduler, type=PUT:
18/09/25 15:25:15 INFO impl.AMRMClientImpl: Received new token for : server1:45454:
18/09/25 15:25:15 INFO skein.ApplicationMaster: Starting container_e50_1537170416259_7296_01_000002:
18/09/25 15:25:15 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_e50_1537170416259_7296_01_000002:
18/09/25 15:25:15 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0:
18/09/25 15:25:15 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1:
18/09/25 15:25:15 INFO impl.ContainerManagementProtocolProxy: Opening proxy : server1:45454:
18/09/25 15:25:17 INFO impl.AMRMClientImpl: Received new token for : server1:45454:
18/09/25 15:25:17 INFO skein.ApplicationMaster: Starting container_e50_1537170416259_7296_01_000003:
18/09/25 15:25:17 INFO skein.ApplicationMaster: RUNNING: dask.worker_0 on container_e50_1537170416259_7296_01_000003:
18/09/25 15:25:17 INFO skein.ApplicationMaster: Starting container_e50_1537170416259_7296_01_000004:
18/09/25 15:25:17 INFO impl.ContainerManagementProtocolProxy: Opening proxy : server1:45454:
18/09/25 15:25:17 INFO skein.ApplicationMaster: RUNNING: dask.worker_1 on container_e50_1537170416259_7296_01_000004:
18/09/25 15:25:17 INFO impl.ContainerManagementProtocolProxy: Opening proxy : server1:45454:
18/09/25 15:25:18 INFO skein.ApplicationMaster: FAILED: dask.worker_1:
18/09/25 15:25:18 INFO skein.ApplicationMaster: RESTARTING: adding new container to replace dask.worker_1:
18/09/25 15:25:18 INFO skein.ApplicationMaster: REQUESTED: dask.worker_2:
18/09/25 15:25:20 INFO skein.ApplicationMaster: Starting container_e50_1537170416259_7296_01_000005:
18/09/25 15:25:20 INFO skein.ApplicationMaster: RUNNING: dask.worker_2 on container_e50_1537170416259_7296_01_000005:
18/09/25 15:25:20 INFO impl.ContainerManagementProtocolProxy: Opening proxy : server1:45454:
18/09/25 15:25:52 INFO skein.ApplicationMaster: Shutting down gRPC server:
18/09/25 15:25:52 INFO skein.ApplicationMaster: gRPC server shut down:
is it possible to give me some insights about how to launch dask on yarn, how to configure the debug mode?
thank you very much for your support, in advance
Here is my submit:
dask-yarn submit --environment dask_venv.tar.gz --worker-count 8 --worker-vcores 2 --worker-memory 4GiB --scheduler-vcores 2 --scheduler-memory 4GiB test_submit.py
Here is my test code:
from sklearn.ensemble import GradientBoostingClassifier
import sklearn.datasets
import dask_ml.datasets
from dask_ml.wrappers import ParallelPostFit
import time
from dask_yarn import YarnCluster
from dask.distributed import Client
cluster = YarnCluster.from_current()
client=Client(cluster)
cluster.scale(10)
X,y = sklearn.datasets.make_classification(n_samples = 1000, random_state=0)
clf = ParallelPostFit(estimator=GradientBoostingClassifier())
clf.fit(X,y)
X_big, _ = dask_ml.datasets.make_classification(n_samples = 2000000, chunks=200000, random_state=0)
t1 = time.time()
clf.predict(X_big).compute()
print("Time dask:{}".format(time.time()-t1))
dask_time = time.time() - t1
clf_local = GradientBoostingClassifier()
clf_local.fit(X,y)
t2 = time.time()
clf_local.predict(X_big)
print("Time sklearn:{}".format(time.time()-t2))
sklearn_time = time.time() - t2
time = [dask_time,sklearn_time]
with open('test.txt','w') as f:
for item in my_list:
f.write("%s\n" % item)
Here is my error:
19/05/29 02:46:07 INFO client.RMProxy: Connecting to ResourceManager at gce-europe-west2-a-d-dataproc-sql-debug6-m/192.168.3.40:8032
19/05/29 02:46:07 INFO client.AHSProxy: Connecting to Application History server at gce-europe-west2-a-d-dataproc-sql-debug6-m/192.168.3.40:10200
19/05/29 02:46:08 INFO skein.Driver: Driver started, listening on 35151
19/05/29 02:46:08 INFO skein.Driver: Uploading application resources to hdfs://gce-europe-west2-a-d-dataproc-sql-debug6-m/user/d5054581/.skein/application_1557133139144_0344
19/05/29 02:46:09 INFO skein.Driver: Submitting application...
19/05/29 02:46:10 INFO impl.YarnClientImpl: Submitted application application_1557133139144_0344
application_1557133139144_0344
19/05/29 02:46:10 INFO skein.Driver: Driver shut down
Here is the dask-yarn status:
Here is the dask-yarn status:
19/05/29 03:04:23 INFO skein.Driver: Driver started, listening on 39537
19/05/29 03:04:24 INFO skein.Driver: Driver shut down
APPLICATION_ID NAME STATE STATUS CONTAINERS VCORES MEMORY RUNTIME
application_1557133139144_0343 dask FINISHED FAILED 0 0 0 17s
And I cannot find the application logs:
19/05/29 03:02:35 INFO client.RMProxy: Connecting to ResourceManager at gce-europe-west2-a-d-dataproc-sql-debug6-m/192.168.3.40:8032
19/05/29 03:02:35 INFO client.AHSProxy: Connecting to Application History server at gce-europe-west2-a-d-dataproc-sql-debug6-m/192.168.3.40:10200
File /yarn-logs/d5054581/logs/application_1557133139144_0343 does not exist.
Can not find any log file matching the pattern: [ALL] for the application: application_1557133139144_0343
Can not find the logs for the application: application_1557133139144_0343 with the appOwner: d5054581
Could you help to check the reason?
Many Thanks
Trying to use pyarrow to access hdfs file and not able to get it working, below is the code, thank you very much in advance.
[rxie@cedgedev03 code]$ python
Python 2.7.12 |Anaconda 4.2.0 (64-bit)| (default, Jul 2 2016, 17:42:40)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Anaconda is brought to you by Continuum Analytics.
Please check out: http://continuum.io/thanks and https://anaconda.org
import pyarrow
import os
os.environ["JAVA_HOME"]="/usr/java/jdk1.8.0_121"
from pyarrow import hdfs
fs = hdfs.connect()
Traceback (most recent call last):
File "", line 1, in
File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/pyarrow/hdfs.py", line 183, in connect
extra_conf=extra_conf)
File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/pyarrow/hdfs.py", line 37, in init
self._connect(host, port, user, kerb_ticket, driver, extra_conf)
File "pyarrow/io-hdfs.pxi", line 89, in pyarrow.lib.HadoopFileSystem._connect
File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: Unable to load libhdfs
Hi,
We are trying out dask_yarn
version 0.3.0 (with dask 0.18.2)
because of the conflicts between the boost-cpp i'm running with pyarrow
version 0.10.0
We are trying to read a csv from hdfs - however we get an error when running dd.read_csv('hdfs:///path/to/file.csv')
since it is trying to use hdfs3
.
from the documentation it seems that there is an option to use pyarrow .
What is the correct syntax/configuration to do so?
Thx for your time and great work.
Sorry I guess this should be in the dask
repo and not dask_yarn.
will post it there
Error trying to read a parquet file. Looks like the useful info is the first few lines, but I can't parse it. Any direction would help.
2019-02-19 09:46:43.006328, p43349, th140597036791616, ERROR Failed to invoke RPC call "getFsStats" on server "namenode:8020":
RpcChannel.cpp: 483: HdfsRpcException: Failed to invoke RPC call "getFsStats" on server "namenode:8020"
Caused by
RpcChannel.cpp: 931: HdfsRpcServerException: org.apache.hadoop.security.authorize.AuthorizationException: User: jlord is not allowed to impersonate 1�H�M�dH3
%(
2019-02-19 09:46:43.006434, p43349, th140597036791616, INFO Retry idempotent RPC call "getFsStats" on server "namenode:8020"
Traceback (most recent call last):
File "simple.py", line 19, in <module>
df = dataframe.read_parquet('hdfs:///user/hive/warehouse/dra_sanity_tests.db/test_excavator')
File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask/dataframe/io/parquet.py", line 1145, in read_parquet
storage_options=storage_options
File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask/bytes/core.py", line 354, in get_fs_token_paths
fs, fs_token = get_fs(protocol, options)
File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask/bytes/core.py", line 513, in get_fs
fs = cls(**storage_options)
File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask/bytes/hdfs3.py", line 15, in __init__
self.fs = hdfs3.HDFileSystem(**kwargs)
File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/hdfs3/core.py", line 76, in __init__
self.connect()
File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/hdfs3/core.py", line 141, in connect
raise ConnectionError('Connection Failed: {}'.format(msg))
ConnectionError: Connection Failed: HdfsRpcException: Failed to invoke RPC call "getFsStats" on server "namenode:8020" Caused by: HdfsRpcServerException: org.apache.hadoop.security.authorize.AuthorizationException: User: jlord is not allowed to impersonate 1�H�M�dH3
%(
I'm following the instructions for deploiying on EMR and using the bootstrap-dask
script with no modifications from this repo. I'm able to successfully connect to the jupyter notebook server and create a YarnCluster
. However, when I try to add workers to the cluster they never successfully start. Looking at the Skein logs, it looks like it's trying to start containers, but they are all failing with the following message:
Traceback (most recent call last):
File "/mnt3/yarn/usercache/hadoop/appcache/application_1561654916149_0001/container_1561654916149_0001_01_000002/environment/bin/dask-yarn", line 7, in <module>
from dask_yarn.cli import main
File "/mnt3/yarn/usercache/hadoop/appcache/application_1561654916149_0001/container_1561654916149_0001_01_000002/environment/lib/python3.6/site-packages/dask_yarn/cli.py", line 17, in <module>
from distributed.cli.utils import install_signal_handlers, uri_from_host_port
ImportError: cannot import name 'uri_from_host_port'
Here is the results of a pip freeze
asn1crypto==0.24.0
attrs==19.1.0
backcall==0.1.0
bleach==3.1.0
bokeh==1.2.0
boto3==1.9.162
botocore==1.12.163
certifi==2019.6.16
cffi==1.12.3
chardet==3.0.4
Click==7.0
cloudpickle==1.1.1
conda==4.7.5
conda-pack==0.3.1
conda-package-handling==1.3.10
cryptography==2.7
cytoolz==0.9.0.1
dask==2.0.0
dask-yarn==0.6.0
decorator==4.4.0
defusedxml==0.6.0
distributed==2.0.1
docutils==0.14
entrypoints==0.3
grpcio==1.16.1
heapdict==1.0.0
idna==2.8
ipykernel==5.1.1
ipython==7.5.0
ipython-genutils==0.2.0
ipywidgets==7.4.2
jedi==0.13.3
Jinja2==2.10.1
jmespath==0.9.4
jsonschema==3.0.1
jupyter-client==5.2.4
jupyter-core==4.4.0
libarchive-c==2.8
locket==0.2.0
MarkupSafe==1.1.1
mistune==0.8.4
msgpack==0.6.1
nbconvert==5.5.0
nbformat==4.4.0
nbserverproxy==0.8.8
notebook==5.7.8
numpy==1.16.4
olefile==0.46
packaging==19.0
pandas==0.24.2
pandocfilters==1.4.2
parso==0.4.0
partd==0.3.10
pexpect==4.7.0
pickleshare==0.7.5
Pillow==6.0.0
prometheus-client==0.6.0
prompt-toolkit==2.0.9
protobuf==3.8.0
psutil==5.6.2
ptyprocess==0.6.0
pyarrow==0.11.1
pycosat==0.6.3
pycparser==2.19
Pygments==2.4.2
pyOpenSSL==19.0.0
pyparsing==2.4.0
pyrsistent==0.14.11
PySocks==1.7.0
python-dateutil==2.8.0
pytz==2019.1
PyYAML==5.1
pyzmq==18.0.0
requests==2.22.0
ruamel-yaml==0.15.46
s3fs==0.2.1
s3transfer==0.2.0
Send2Trash==1.5.0
six==1.12.0
skein==0.7.3
sortedcontainers==2.1.0
tblib==1.4.0
terminado==0.8.2
testpath==0.4.2
toolz==0.9.0
tornado==6.0.2
tqdm==4.32.1
traitlets==4.3.2
urllib3==1.24.2
wcwidth==0.1.7
webencodings==0.5.1
widgetsnbextension==3.4.2
zict==0.1.4
I am trying to run dask-yarn 0.5.1 on a corporate yarn cluster and get the following error:
java.lang.IllegalArgumentException: Failed to load any of the given libraries: [netty_tcnative_linux_x86_64, netty_tcnative_linux_x86_64_fedora, netty_tcnative_x86_64, netty_tcnative]
(full error message below)
Another co-worker stumbled upon the same error independently. I am wondering if these packages should be downloaded and can't because our servers cannot connect to maven except through a mirror. I have tried to setup my mirror but am such a maven novice that it may not be working. Either that or the java environment on the server could be configured in an odd way. Any direction for where to look would be helpful.
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadFirstAvailable(NativeLibraryLoader.java:104)
at com.anaconda.skein.shaded.io.netty.handler.ssl.OpenSsl.loadTcNative(OpenSsl.java:440)
at com.anaconda.skein.shaded.io.netty.handler.ssl.OpenSsl.<clinit>(OpenSsl.java:97)
at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.defaultSslProvider(GrpcSslContexts.java:244)
at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:171)
at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.forServer(GrpcSslContexts.java:151)
at com.anaconda.skein.Driver.startServer(Driver.java:119)
at com.anaconda.skein.Driver.run(Driver.java:278)
at com.anaconda.skein.Driver.main(Driver.java:168)
Suppressed: java.lang.UnsatisfiedLinkError: /tmp/libcom_anaconda_skein_shaded_netty_tcnative_linux_x86_64218435490097249322.so: /tmp/libcom_anaconda_skein_shaded_netty_tcnative_linux_x86_64218435490097249322.so: failed to map segment from shared object: Operation not permitted
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:36)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:316)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:215)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadFirstAvailable(NativeLibraryLoader.java:96)
... 8 more
Suppressed: java.lang.UnsatisfiedLinkError: /tmp/libcom_anaconda_skein_shaded_netty_tcnative_linux_x86_64218435490097249322.so: /tmp/libcom_anaconda_skein_shaded_netty_tcnative_linux_x86_64218435490097249322.so: failed to map segment from shared object: Operation not permitted
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:36)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:336)
at java.security.AccessController.doPrivileged(Native Method)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:328)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:306)
... 10 more
Suppressed: java.lang.UnsatisfiedLinkError: no com_anaconda_skein_shaded_netty_tcnative_linux_x86_64 in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:316)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
... 9 more
Suppressed: java.lang.UnsatisfiedLinkError: no com_anaconda_skein_shaded_netty_tcnative_linux_x86_64 in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:336)
at java.security.AccessController.doPrivileged(Native Method)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:328)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:306)
... 10 more
Suppressed: java.lang.UnsatisfiedLinkError: could not load a native library: com_anaconda_skein_shaded_netty_tcnative_linux_x86_64_fedora
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:233)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadFirstAvailable(NativeLibraryLoader.java:96)
... 8 more
Caused by: java.io.FileNotFoundException: META-INF/native/libcom_anaconda_skein_shaded_netty_tcnative_linux_x86_64_fedora.so
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:173)
... 9 more
Suppressed: java.lang.UnsatisfiedLinkError: no com_anaconda_skein_shaded_netty_tcnative_linux_x86_64_fedora in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:316)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
... 9 more
Suppressed: java.lang.UnsatisfiedLinkError: no com_anaconda_skein_shaded_netty_tcnative_linux_x86_64_fedora in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:336)
at java.security.AccessController.doPrivileged(Native Method)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:328)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:306)
... 10 more
Suppressed: java.lang.UnsatisfiedLinkError: could not load a native library: com_anaconda_skein_shaded_netty_tcnative_x86_64
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:233)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadFirstAvailable(NativeLibraryLoader.java:96)
... 8 more
Caused by: java.io.FileNotFoundException: META-INF/native/libcom_anaconda_skein_shaded_netty_tcnative_x86_64.so
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:173)
... 9 more
Suppressed: java.lang.UnsatisfiedLinkError: no com_anaconda_skein_shaded_netty_tcnative_x86_64 in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:316)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
... 9 more
Suppressed: java.lang.UnsatisfiedLinkError: no com_anaconda_skein_shaded_netty_tcnative_x86_64 in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:336)
at java.security.AccessController.doPrivileged(Native Method)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:328)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:306)
... 10 more
Suppressed: java.lang.UnsatisfiedLinkError: could not load a native library: com_anaconda_skein_shaded_netty_tcnative
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:233)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadFirstAvailable(NativeLibraryLoader.java:96)
... 8 more
Caused by: java.io.FileNotFoundException: META-INF/native/libcom_anaconda_skein_shaded_netty_tcnative.so
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:173)
... 9 more
Suppressed: java.lang.UnsatisfiedLinkError: no com_anaconda_skein_shaded_netty_tcnative in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:316)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
... 9 more
Suppressed: java.lang.UnsatisfiedLinkError: no com_anaconda_skein_shaded_netty_tcnative in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:336)
at java.security.AccessController.doPrivileged(Native Method)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:328)
at com.anaconda.skein.shaded.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:306)
... 10 more
Feb 15, 2019 4:08:39 PM com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts defaultSslProvider
INFO: Conscrypt not found (this may be normal)
Feb 15, 2019 4:08:39 PM com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts defaultSslProvider
INFO: Jetty ALPN unavailable (this may be normal)
java.lang.ClassNotFoundException: com/anaconda/skein/shaded/org/eclipse/jetty/alpn/ALPN
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at com.anaconda.skein.shaded.io.grpc.netty.JettyTlsUtil.isJettyAlpnConfigured(JettyTlsUtil.java:64)
at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.findJdkProvider(GrpcSslContexts.java:266)
at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.defaultSslProvider(GrpcSslContexts.java:248)
at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:171)
at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.forServer(GrpcSslContexts.java:151)
at com.anaconda.skein.Driver.startServer(Driver.java:119)
at com.anaconda.skein.Driver.run(Driver.java:278)
at com.anaconda.skein.Driver.main(Driver.java:168)
19/02/15 16:08:39 ERROR skein.Driver: Error running Driver
java.lang.IllegalStateException: Could not find TLS ALPN provider; no working netty-tcnative, Conscrypt, or Jetty NPN/ALPN available
at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.defaultSslProvider(GrpcSslContexts.java:258)
at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:171)
at com.anaconda.skein.shaded.io.grpc.netty.GrpcSslContexts.forServer(GrpcSslContexts.java:151)
at com.anaconda.skein.Driver.startServer(Driver.java:119)
at com.anaconda.skein.Driver.run(Driver.java:278)
at com.anaconda.skein.Driver.main(Driver.java:168)
Traceback (most recent call last):
File "simple.py", line 8, in <module>
worker_memory="8GiB")
File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask_yarn/core.py", line 291, in __init__
self._start_cluster(spec, skein_client)
File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask_yarn/core.py", line 335, in _start_cluster
skein_client = _get_skein_client(skein_client)
File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/dask_yarn/core.py", line 44, in _get_skein_client
return skein.Client(security=security)
File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/skein/core.py", line 334, in __init__
java_options=java_options)
File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/skein/core.py", line 247, in _start_driver
raise DriverError("Failed to start java process")
skein.exceptions.DriverError: Failed to start java process
Exception ignored in: <function Client.__del__ at 0x7f420d5c5c80>
Traceback (most recent call last):
File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/skein/core.py", line 472, in __del__
self.close()
File "/nas/isg_prodops_work/jlord/conda/envs/dask-yarn/lib/python3.7/site-packages/skein/core.py", line 461, in close
if self._proc is not None:
AttributeError: _proc
#!/usr/bin/env python
from __future__ import print_function
import logging
import os
import sys
import time
import dask_yarn
from dask.distributed import Client
logger = logging.getLogger(__name__)
VIRTUALENV_TARBALL = environment=os.path.expanduser("~/venv-dask-upstream.tar.gz")
WORKER_ENV = {"HADOOP_CONF_DIR": "/data/app/spark-yarn/hadoop-conf", "JAVA_HOME": "/usr/lib/jvm/java"}
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(levelname)s %(name)s: %(message)s")
# Dask has issues with these
for proxy_var in ['http_proxy', 'https_proxy']:
if proxy_var in os.environ:
del os.environ[proxy_var]
logger.info("Initialising YarnCluster")
cluster_start_time = time.time()
cluster = dask_yarn.YarnCluster(
VIRTUALENV_TARBALL,
worker_vcores=1,
worker_memory="3GB",
worker_env=WORKER_ENV,
n_workers=1,
scheduler_memory="1GB")
logger.info("Initialising YarnCluster: done in %.3f", time.time() - cluster_start_time)
logger.info("Initialising Client")
client_start_time = time.time()
dask_client = Client(cluster)
logger.info("Initialising Client: done in %.3f", time.time() - client_start_time)
logger.info("Checking versions")
versions_start_time = time.time()
_versions = dask_client.get_versions(check=True)
logger.info("Checking versions: done in %.3f", time.time() - versions_start_time)
logger.info("Creating futures")
futures = dask_client.map(lambda x: x ** 2, range(4))
logger.info("Gathering futures")
print("Result: %r" % (dask_client.gather(futures),))
Everything works as expected:
(venv-dask-upstream)george@george:~$ python dask_poc.py
[...]
Result: [0, 1, 4, 9]
(venv-dask-upstream)george@george:~$ python
>>> from dask_poc import *
[...]
2018-09-17 11:02:50,823 INFO dask_poc: Initialising Client
<loops forever in https://github.com/dask/distributed/blob/1.23.1/distributed/utils.py#L274-L275 >
pip freeze
:asn1crypto==0.24.0
backports-abc==0.5
backports.shutil-get-terminal-size==1.0.0
backports.weakref==1.0.post1
cffi==1.11.5
click==6.7
cloudpickle==0.5.6
cryptography==2.3.1
dask==0.19.1
dask-yarn==0.3.1
decorator==4.3.0
distributed==1.23.1
enum34==1.1.6
futures==3.2.0
grpcio==1.15.0
HeapDict==1.0.0
idna==2.7
ipaddress==1.0.22
ipdb==0.11
ipython==5.8.0
ipython-genutils==0.2.0
msgpack==0.5.6
pathlib2==2.3.2
pexpect==4.6.0
pickleshare==0.7.4
prompt-toolkit==1.0.15
protobuf==3.6.1
psutil==5.4.7
ptyprocess==0.6.0
pycparser==2.18
Pygments==2.2.0
PyYAML==3.13
scandir==1.9.0
simplegeneric==0.8.1
singledispatch==3.4.0.3
six==1.11.0
skein==0.1.1
sortedcontainers==2.0.5
tblib==1.3.2
toolz==0.9.0
tornado==5.1
traitlets==4.3.2
venv-pack==0.2.0
wcwidth==0.1.7
zict==0.1.3
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.