Comments (23)
I'm not able to assist further without getting access to the actual machine. This software works fine elsewhere so it seems to be a machine-specific configuration issue. If you do find the solution it would be good to know what it is
from dask-yarn.
Hi,
@jcrist gave the final solution in #30 . Specifying the worker_env={'ARROW_LIBHDFS_DIR': ...}
while creating the YarnCluster
object did the job.
Thx!
from dask-yarn.
This is defined in the dask config with key "hdfs_driver". You can either set it in a file, by environment variable, or at runtime as:
dask.config.set({"hdfs_driver": "pyarrow"})
from dask-yarn.
Thx!
That did the trick, however now I'm getting another error
ArrowIOError: Unable to load libhdfs
In anaconda I can't see the a package named libhdfs
BTW the hadoop cluster was installed from a cloudera
distribution (version 5.14) on an offline SLES environment
from dask-yarn.
See the instructions for how to load libhdfs.
from dask-yarn.
Once again thank you for your time - we greatly appreciate it.
I've updated the environments as listed in the instructions (Since I'm running on a Cloudera instance the locations of the files were different), however I'm still getting the same error.
Maybe since I'm running on an edge node
with dask_yarn
the configuration needs to be different?
anyway I'll have an option to look into it only towards the end of next week
from dask-yarn.
This is defined in the dask config with key "hdfs_driver". You can either set it in a file, by environment variable, or at runtime as:
Note also that dask will use the first of ['hdfs3', 'pyarrow'] that it finds, so if you don't install hdfs3 but do install pyarrow it will just use pyarrow automatically.
Maybe since I'm running on an edge node with dask_yarn the configuration needs to be different?
Things should be the same. A few things to check:
- Is
libhdfs.so
even installed? By default cdh doesn't install it, but it seems to be a common add-on on many systems. - Is Is JAVA_HOME set properly?
- Is the classpath already set? If not set, but
hadoop classpath --glob
succeeds, pyarrow will automatically setup the classpath for you.
On my system I set JAVA_HOME
and HADOOP_HOME
and everything else just works.
from dask-yarn.
What is the proper fix for the boost-cpp issue? We can help sort it out
@kszucs @pitrou @cpcloud @xhochy
from dask-yarn.
Installing hdfs3
from conda-forge seems to pull libhdfs3
from defaults which pulls libboost
from defaults which provides the same files, but with an incompatible ABI, as boost-cpp
from conda-forge.
If you switch to libhdfs3
from conda-forge it downgrades boost-cpp
from 1.67 to 1.66, and it also downgrades arrow-cpp
and pyarrow
to 0.9.0 (at which point they fail importing because they expect boost... 1.65).
So it seems there are several issues here:
- the ABI discrepancy between conda-forge and defaults produces broken package sets
- for some wicked reason, the C++ boost libraries are named differently on conda-forge and default (
boost-cpp
vs.libboost
), meaning they can coexist in an environment even though one will clobber the other libhdfs3
on conda-forge is depending on a stale boost version- conda doesn't allow installing different versions of a library side-by-side, which means everyone must upgrade their shared library dependencies at once to avoid annoying users
So I'm not sure what the proper fix would be, but perhaps we should link boost statically and privately in our libraries by default?
from dask-yarn.
Here is a working configuration (with both libhdfs and libhdfs3): https://github.com/apache/arrow/tree/master/dev/hdfs_integration - at least it has worked last time I checked. We are working on to run hdfs and dask integration tests regularly, so there might be issues with it.
from dask-yarn.
for some wicked reason, the C++ boost libraries are named differently on conda-forge and default (
boost-cpp
vs.libboost
), meaning they can coexist in an environment even though one will clobber the other.
From here and here it looks like the boost-cpp
name in conda-forge is intended to be removed and replaced with libboost
. Not sure the timeline on this though. For now, defaults also contains a boost-cpp
metapackage to mirror conda-forge's naming, so arrow could depend on that instead. This would fix the silent clobbering issue, but not the abi compatibility issue.
cc @msarahan for thoughts (if he has time).
from dask-yarn.
Using the metapackage (the conda-forge name) is probably the safest for now, aside from the static/private linking of boost.
There's really no trivial fix here, though. Lots of discussion at conda/conda#7626. The notion of strict channel priority should help this situation: conda/conda#7729
The real solution is both fixing metadata to be more accurate, but also building out consistent sets of software. The conda-forge bot should help a lot with that. For example, in the past, when boost was updated, downstream things were rebuilt manually (and many got missed). Now, the bot can consider a migration where it bumps the boost version number and builds everything affected.
from dask-yarn.
Hi ,
Thank again @jcrist for your time and attention - sorry for my late response -
Things should be the same. A few things to check:
* Is `libhdfs.so` even installed? By default cdh doesn't install it, but it seems to be a common add-on on many systems.
Yes - it is installed under /opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib64
which is set to ARROW_LIBHDFS_DIR
* Is Is JAVA_HOME set properly?
when i run java -version
i get
java version "1.8.0_162"
Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
Is the classpath already set? If not set, but hadoop classpath --glob succeeds, pyarrow will automatically setup the classpath for you.
I think it is OK - hadoop classpath --glob
returns a result
after restarting jupyter notebook several times I noticed that a warning appears after running
df = dd.read_csv('hdfs://path/to/file.csv' )
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Here is the stack error after running df.head()
ArrowIOError Traceback (most recent call last)
in ()
----> 1 df.head()~/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
906
907 if compute:
--> 908 result = result.compute()
909 return result
910~/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158~/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
400 keys = [x.dask_keys() for x in collections]
401 postcomputes = [x.dask_postcompute() for x in collections]
--> 402 results = schedule(dsk, keys, **kwargs)
403 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
404~/anaconda3/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, **kwargs)
2191 try:
2192 results = self.gather(packed, asynchronous=asynchronous,
-> 2193 direct=direct)
2194 finally:
2195 for f in futures.values():~/anaconda3/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
1566 return self.sync(self._gather, futures, errors=errors,
1567 direct=direct, local_worker=local_worker,
-> 1568 asynchronous=asynchronous)
1569
1570 @gen.coroutine~/anaconda3/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
651 return future
652 else:
--> 653 return sync(self.loop, func, *args, **kwargs)
654
655 def repr(self):~/anaconda3/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
275 e.wait(10)
276 if error[0]:
--> 277 six.reraise(*error[0])
278 else:
279 return result[0]~/anaconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.traceback is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None~/anaconda3/lib/python3.6/site-packages/distributed/utils.py in f()
260 if timeout is not None:
261 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262 result[0] = yield future
263 except Exception as exc:
264 error[0] = sys.exc_info()~/anaconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
1097
1098 try:
-> 1099 value = future.result()
1100 except Exception:
1101 self.had_exception = True~/anaconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
1105 if exc_info is not None:
1106 try:
-> 1107 yielded = self.gen.throw(*exc_info)
1108 finally:
1109 # Break up a reference to itself~/anaconda3/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1445 six.reraise(type(exception),
1446 exception,
-> 1447 traceback)
1448 if errors == 'skip':
1449 bad_keys.add(key)~/anaconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
690 value = tp()
691 if value.traceback is not tb:
--> 692 raise value.with_traceback(tb)
693 raise value
694 finally:/data/05/yarn/nm/usercache/fcuser/appcache/application_1536851341745_0001/container_1536851341745_0001_01_000004/environment/lib/python3.6/site-packages/distributed/protocol/pickle.py in loads()
/data/05/yarn/nm/usercache/fcuser/appcache/application_1536851341745_0001/container_1536851341745_0001_01_000004/environment/lib/python3.6/site-packages/pyarrow/hdfs.py in init()
~/anaconda3/lib/python3.6/site-packages/pyarrow/io-hdfs.pxi in pyarrow.lib.HadoopFileSystem._connect()
87 if driver == 'libhdfs':
88 with nogil:
---> 89 check_status(HaveLibHdfs())
90 conf.driver = HdfsDriver_LIBHDFS
91 elif driver == 'libhdfs3':~/anaconda3/lib/python3.6/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()
81 raise ArrowInvalid(message)
82 elif status.IsIOError():
---> 83 raise ArrowIOError(message)
84 elif status.IsOutOfMemory():
85 raise ArrowMemoryError(message)ArrowIOError: Unable to load libhdfs
from dask-yarn.
The dask stuff here is unnecessary, your minimum test case should be:
from pyarrow import hdfs
fs = hdfs.connect()
Since that doesn't seem to work for you, you either need to set/change the various environment variables, or this is an issue in pyarrow. Either way, this is not an issue with dask or with dask-yarn.
Can I ask you to file an issue in pyarrow or ask on stackoverflow so someone from their community can help you?
from dask-yarn.
@sephib can you check the value of os.environ['ARROW_LIBHDFS_DIR']
from the process where you are trying to run this? To make sure it points to the directory containing libhdfs.so
. It's possible your env var isn't getting picked up
from dask-yarn.
If that doesn't work you should try loading the library with ctypes to see if there isn't some other issues:
import ctypes
ctypes.CDLL('/opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib64/libhdfs.so')
If neither of those things gives any new information please open an Arrow JIRA issue so we can investigate further
from dask-yarn.
Hi,
Thank you gain for your response and attention - only now I was able to get access to the system.
from pyarrow import hdfs
fs = hdfs.connect()
I'm able to connect and see the files on the hdfs (running fs.ls('path/to/files')
.
So I successfully managed to read the file via:
with fs.open('hdfs:///path/to/file.csv') as f:
df = pd.read_csv(f)
However when I tried to read the file using dask
via:
with fs.open('hdfs:///path/to/file.csv') as f:
ddf = dd.read_csv(f)
I'm getting the following error
TypeError: url type not understood: <pyarrow.lib.HdfsFile object at 0xxxxxx>
Any suggestions?
os.environ['ARROW_LIBHDFS_DIR']
returns the path to libhdfs.so
/opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib64
I tried ctypes
and still got the same error.
ArrowIOError: Unable to load libhdfs
Thank you again for all of your time
from dask-yarn.
Can you show the error when trying to load with ctypes? If you can't load with ctypes then anything related to pyarrow is unrelated
from dask-yarn.
Also we don't use the "hdfs://" in pyarrow, though we should detect and remove it automatically. I'll open a JIRA about that
from dask-yarn.
However when I tried to read the file using dask via:
Dask doesn't support file objects, just pass the path to dd.read_csv
directly:
ddf = dd.read_csv('hdfs:///path/to/file.csv')
from dask-yarn.
Sorry - I wasn't clear.
import ctypes
cytpes.CDLL('/opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib64/libhdfs.so')
was successful and returned
<CDLL '/opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib64/libhdfs.so', handle 55965c0c0410 at 0xXXX>
but still got an error while trying to dd.read_csv()
After restarting YARN
and my jupyter notebook i got an error trying to run cytpes.CDLL
.
libjvm.so: cannot open shared object file: No such file or directory
after researching I found this post and saw that
ldd /opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib/hadoop/bin/fuse_dfs
results with
libjvm.so => not found
libhdfs.so.0.0.0 => not found
I update the libjvm.so
link successfully
ln -s /usr/java/jdk1.8.0_162/jre/lib/amd64/server/libjvm.so /lib64/libjvm.so
and now the ctype
runs successfully
however I failed to understand where to the libhdfs.so.0.0.0
tried to run
ln -s /opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib64/libhdfs.so.0.0.0 /lib64//libhdfs.so.0.0.0
but I'm still getting an error . I think i'm not linking it correctly.
any ideas?
still, when I try reading directly the hdfs
file using dd.read_csv()
I still get the error
ArrowIOError: Unable to load libhdfs
Thx again for your suggestions
Will need to try to look at it tomorrow
from dask-yarn.
Hi,
After consulting with Cloudera's support
I've added to all the nodes the following environment (in /etc/environment
)
export LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH/lib64:$LD_LIBRARY_PATH
I also needed to update the link of the java in /opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib/hadoop/bin/fuse_dfs
with
sudo ln -s /usr/java/jdk1.8.0_162/jre/lib/amd64/server/libjvm.so /lib64/libjvm.so
And now there are no missing links in fuse_dfs
ldd /opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib/hadoop/bin/fuse_dfs
Finally I'm not getting any errors concerning the libraries.
I checked my client and client.get_versions(check=True)
- all looks good.
but when I execute
df = dd.read_csv('hdfs:///path/to/file.csv')
df.head()
I get the error below. Should I open a different issue for this error?
KilledWorker Traceback (most recent call last)
in ()
----> 1 df.head()/opt/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
874
875 if compute:
--> 876 result = result.compute()
877 return result
878/opt/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158/opt/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
393 keys = [x.dask_keys() for x in collections]
394 postcomputes = [x.dask_postcompute() for x in collections]
--> 395 results = schedule(dsk, keys, **kwargs)
396 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
397/opt/anaconda3/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, **kwargs)
2228 try:
2229 results = self.gather(packed, asynchronous=asynchronous,
-> 2230 direct=direct)
2231 finally:
2232 for f in futures.values():/opt/anaconda3/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
1591 return self.sync(self._gather, futures, errors=errors,
1592 direct=direct, local_worker=local_worker,
-> 1593 asynchronous=asynchronous)
1594
1595 @gen.coroutine/opt/anaconda3/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
645 return future
646 else:
--> 647 return sync(self.loop, func, *args, **kwargs)
648
649 def repr(self):/opt/anaconda3/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
275 e.wait(10)
276 if error[0]:
--> 277 six.reraise(*error[0])
278 else:
279 return result[0]/opt/anaconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.traceback is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None/opt/anaconda3/lib/python3.6/site-packages/distributed/utils.py in f()
260 if timeout is not None:
261 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262 result[0] = yield future
263 except Exception as exc:
264 error[0] = sys.exc_info()/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
1131
1132 try:
-> 1133 value = future.result()
1134 except Exception:
1135 self.had_exception = True/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
1139 if exc_info is not None:
1140 try:
-> 1141 yielded = self.gen.throw(*exc_info)
1142 finally:
1143 # Break up a reference to itself/opt/anaconda3/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1467 six.reraise(type(exception),
1468 exception,
-> 1469 traceback)
1470 if errors == 'skip':
1471 bad_keys.add(key)/opt/anaconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.traceback is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = NoneKilledWorker: ("('from-delayed-pandas_read_text-read-block-head-1-5-from-delayed-6d4696b8c31047544bf639fdd8e724fb', 0)", 'tcp://10.241.11.15:36462')
from dask-yarn.
Hi,
BTW - I'm now able to read directly the csv on the hdfs
using
import dask.dataframe as dd
df = dd.read_csv('hdfs:///path/to/file.csv', sep='|', encoding='1255')
df.head()
However when running df.head()
with dask_yarn
I'm getting the error
KilledWorker: ("('from-delayed-pandas_read_text-read-block-head-1-5-from-delayed-6d4696b8c31047544bf639fdd8e724fb', 0)", 'tcp://10.241.11.15:36462')
from dask-yarn.
Related Issues (20)
- Conda environment does not activate HOT 1
- Dask Scheduler host/port Not Written to Skein Key-Value Storage When YARN Application Restarts HOT 5
- Move default branch from "master" -> "main" HOT 1
- YarnCluster.shutdown() Won't Work on EMR, results in `concurrent.futures._base.CancelledError` HOT 1
- Verify that Read the Docs is building after master -> main HOT 7
- YarnCluster hangs HOT 11
- wait_for_workers got stuck when to create cluster but application failed on yarn HOT 3
- dask-yarn job fails with dumps_msgpack ImportError HOT 3
- register workers of scheduler are less than workers in dashborad HOT 1
- can't upload files HOT 2
- EMR 6.3.0 Bootstrap Action BOOTSTRAP_FAILURE : Python 3.9 support? HOT 3
- Application Failure When Submitting Dask-Yarn Model Inferencing Job Remotely
- FileNotFoundError: [Errno 2] No such file or directory: 'yarn' HOT 3
- Jupyter Notebook Cell Hangs after submitting job to remote EMR cluster
- distributed 2022.3.0 no more compatible with dask-yarn because of missing "status" attribute in YarnCluster HOT 7
- YarnCluster() does not initialize but runs indefinetly HOT 3
- AttributeError while running dask on amazon EMR. HOT 3
- .skein.sh: line 2: environment/bin/python: No such file or directory HOT 4
- Bootstrapping for 40min, when use the script. HOT 2
- Issue with deprecated format_bytes HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from dask-yarn.