Git Product home page Git Product logo

Comments (23)

wesm avatar wesm commented on August 13, 2024 1

I'm not able to assist further without getting access to the actual machine. This software works fine elsewhere so it seems to be a machine-specific configuration issue. If you do find the solution it would be good to know what it is

from dask-yarn.

sephib avatar sephib commented on August 13, 2024 1

Hi,
@jcrist gave the final solution in #30 . Specifying the worker_env={'ARROW_LIBHDFS_DIR': ...} while creating the YarnCluster object did the job.
Thx!

from dask-yarn.

martindurant avatar martindurant commented on August 13, 2024

This is defined in the dask config with key "hdfs_driver". You can either set it in a file, by environment variable, or at runtime as:

dask.config.set({"hdfs_driver": "pyarrow"})

from dask-yarn.

sephib avatar sephib commented on August 13, 2024

Thx!
That did the trick, however now I'm getting another error

ArrowIOError: Unable to load libhdfs

In anaconda I can't see the a package named libhdfs

BTW the hadoop cluster was installed from a cloudera distribution (version 5.14) on an offline SLES environment

from dask-yarn.

martindurant avatar martindurant commented on August 13, 2024

See the instructions for how to load libhdfs.

from dask-yarn.

sephib avatar sephib commented on August 13, 2024

Once again thank you for your time - we greatly appreciate it.

I've updated the environments as listed in the instructions (Since I'm running on a Cloudera instance the locations of the files were different), however I'm still getting the same error.
Maybe since I'm running on an edge node with dask_yarn the configuration needs to be different?
anyway I'll have an option to look into it only towards the end of next week

from dask-yarn.

jcrist avatar jcrist commented on August 13, 2024

This is defined in the dask config with key "hdfs_driver". You can either set it in a file, by environment variable, or at runtime as:

Note also that dask will use the first of ['hdfs3', 'pyarrow'] that it finds, so if you don't install hdfs3 but do install pyarrow it will just use pyarrow automatically.

Maybe since I'm running on an edge node with dask_yarn the configuration needs to be different?

Things should be the same. A few things to check:

  • Is libhdfs.so even installed? By default cdh doesn't install it, but it seems to be a common add-on on many systems.
  • Is Is JAVA_HOME set properly?
  • Is the classpath already set? If not set, but hadoop classpath --glob succeeds, pyarrow will automatically setup the classpath for you.

On my system I set JAVA_HOME and HADOOP_HOME and everything else just works.

from dask-yarn.

wesm avatar wesm commented on August 13, 2024

What is the proper fix for the boost-cpp issue? We can help sort it out

@kszucs @pitrou @cpcloud @xhochy

from dask-yarn.

pitrou avatar pitrou commented on August 13, 2024

Installing hdfs3 from conda-forge seems to pull libhdfs3 from defaults which pulls libboost from defaults which provides the same files, but with an incompatible ABI, as boost-cpp from conda-forge.

If you switch to libhdfs3 from conda-forge it downgrades boost-cpp from 1.67 to 1.66, and it also downgrades arrow-cpp and pyarrow to 0.9.0 (at which point they fail importing because they expect boost... 1.65).

So it seems there are several issues here:

  • the ABI discrepancy between conda-forge and defaults produces broken package sets
  • for some wicked reason, the C++ boost libraries are named differently on conda-forge and default (boost-cpp vs. libboost), meaning they can coexist in an environment even though one will clobber the other
  • libhdfs3 on conda-forge is depending on a stale boost version
  • conda doesn't allow installing different versions of a library side-by-side, which means everyone must upgrade their shared library dependencies at once to avoid annoying users

So I'm not sure what the proper fix would be, but perhaps we should link boost statically and privately in our libraries by default?

from dask-yarn.

kszucs avatar kszucs commented on August 13, 2024

Here is a working configuration (with both libhdfs and libhdfs3): https://github.com/apache/arrow/tree/master/dev/hdfs_integration - at least it has worked last time I checked. We are working on to run hdfs and dask integration tests regularly, so there might be issues with it.

from dask-yarn.

jcrist avatar jcrist commented on August 13, 2024

for some wicked reason, the C++ boost libraries are named differently on conda-forge and default (boost-cpp vs. libboost), meaning they can coexist in an environment even though one will clobber the other.

From here and here it looks like the boost-cpp name in conda-forge is intended to be removed and replaced with libboost. Not sure the timeline on this though. For now, defaults also contains a boost-cpp metapackage to mirror conda-forge's naming, so arrow could depend on that instead. This would fix the silent clobbering issue, but not the abi compatibility issue.

cc @msarahan for thoughts (if he has time).

from dask-yarn.

msarahan avatar msarahan commented on August 13, 2024

Using the metapackage (the conda-forge name) is probably the safest for now, aside from the static/private linking of boost.

There's really no trivial fix here, though. Lots of discussion at conda/conda#7626. The notion of strict channel priority should help this situation: conda/conda#7729

The real solution is both fixing metadata to be more accurate, but also building out consistent sets of software. The conda-forge bot should help a lot with that. For example, in the past, when boost was updated, downstream things were rebuilt manually (and many got missed). Now, the bot can consider a migration where it bumps the boost version number and builds everything affected.

from dask-yarn.

sephib avatar sephib commented on August 13, 2024

Hi ,
Thank again @jcrist for your time and attention - sorry for my late response -

Things should be the same. A few things to check:

* Is `libhdfs.so` even installed? By default cdh doesn't install it, but it seems to be a common add-on on many systems.

Yes - it is installed under /opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib64 which is set to ARROW_LIBHDFS_DIR

* Is Is JAVA_HOME set properly?

when i run java -version i get

java version "1.8.0_162"
Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)

Is the classpath already set? If not set, but hadoop classpath --glob succeeds, pyarrow will automatically setup the classpath for you.

I think it is OK - hadoop classpath --glob returns a result

after restarting jupyter notebook several times I noticed that a warning appears after running

df = dd.read_csv('hdfs://path/to/file.csv' )

WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Here is the stack error after running df.head()


ArrowIOError Traceback (most recent call last)
in ()
----> 1 df.head()

~/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
906
907 if compute:
--> 908 result = result.compute()
909 return result
910

~/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158

~/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
400 keys = [x.dask_keys() for x in collections]
401 postcomputes = [x.dask_postcompute() for x in collections]
--> 402 results = schedule(dsk, keys, **kwargs)
403 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
404

~/anaconda3/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, **kwargs)
2191 try:
2192 results = self.gather(packed, asynchronous=asynchronous,
-> 2193 direct=direct)
2194 finally:
2195 for f in futures.values():

~/anaconda3/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
1566 return self.sync(self._gather, futures, errors=errors,
1567 direct=direct, local_worker=local_worker,
-> 1568 asynchronous=asynchronous)
1569
1570 @gen.coroutine

~/anaconda3/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
651 return future
652 else:
--> 653 return sync(self.loop, func, *args, **kwargs)
654
655 def repr(self):

~/anaconda3/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
275 e.wait(10)
276 if error[0]:
--> 277 six.reraise(*error[0])
278 else:
279 return result[0]

~/anaconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.traceback is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None

~/anaconda3/lib/python3.6/site-packages/distributed/utils.py in f()
260 if timeout is not None:
261 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262 result[0] = yield future
263 except Exception as exc:
264 error[0] = sys.exc_info()

~/anaconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
1097
1098 try:
-> 1099 value = future.result()
1100 except Exception:
1101 self.had_exception = True

~/anaconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
1105 if exc_info is not None:
1106 try:
-> 1107 yielded = self.gen.throw(*exc_info)
1108 finally:
1109 # Break up a reference to itself

~/anaconda3/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1445 six.reraise(type(exception),
1446 exception,
-> 1447 traceback)
1448 if errors == 'skip':
1449 bad_keys.add(key)

~/anaconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
690 value = tp()
691 if value.traceback is not tb:
--> 692 raise value.with_traceback(tb)
693 raise value
694 finally:

/data/05/yarn/nm/usercache/fcuser/appcache/application_1536851341745_0001/container_1536851341745_0001_01_000004/environment/lib/python3.6/site-packages/distributed/protocol/pickle.py in loads()

/data/05/yarn/nm/usercache/fcuser/appcache/application_1536851341745_0001/container_1536851341745_0001_01_000004/environment/lib/python3.6/site-packages/pyarrow/hdfs.py in init()

~/anaconda3/lib/python3.6/site-packages/pyarrow/io-hdfs.pxi in pyarrow.lib.HadoopFileSystem._connect()
87 if driver == 'libhdfs':
88 with nogil:
---> 89 check_status(HaveLibHdfs())
90 conf.driver = HdfsDriver_LIBHDFS
91 elif driver == 'libhdfs3':

~/anaconda3/lib/python3.6/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()
81 raise ArrowInvalid(message)
82 elif status.IsIOError():
---> 83 raise ArrowIOError(message)
84 elif status.IsOutOfMemory():
85 raise ArrowMemoryError(message)

ArrowIOError: Unable to load libhdfs

from dask-yarn.

jcrist avatar jcrist commented on August 13, 2024

The dask stuff here is unnecessary, your minimum test case should be:

from pyarrow import hdfs
fs = hdfs.connect()

Since that doesn't seem to work for you, you either need to set/change the various environment variables, or this is an issue in pyarrow. Either way, this is not an issue with dask or with dask-yarn.

Can I ask you to file an issue in pyarrow or ask on stackoverflow so someone from their community can help you?

from dask-yarn.

wesm avatar wesm commented on August 13, 2024

@sephib can you check the value of os.environ['ARROW_LIBHDFS_DIR'] from the process where you are trying to run this? To make sure it points to the directory containing libhdfs.so. It's possible your env var isn't getting picked up

from dask-yarn.

wesm avatar wesm commented on August 13, 2024

If that doesn't work you should try loading the library with ctypes to see if there isn't some other issues:

import ctypes
ctypes.CDLL('/opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib64/libhdfs.so')

If neither of those things gives any new information please open an Arrow JIRA issue so we can investigate further

from dask-yarn.

sephib avatar sephib commented on August 13, 2024

Hi,
Thank you gain for your response and attention - only now I was able to get access to the system.

from pyarrow import hdfs
fs = hdfs.connect()

I'm able to connect and see the files on the hdfs (running fs.ls('path/to/files').
So I successfully managed to read the file via:

with fs.open('hdfs:///path/to/file.csv') as f:
    df = pd.read_csv(f)

However when I tried to read the file using dask via:

with fs.open('hdfs:///path/to/file.csv') as f:
    ddf = dd.read_csv(f)

I'm getting the following error

TypeError: url type not understood: <pyarrow.lib.HdfsFile object at 0xxxxxx>

Any suggestions?

os.environ['ARROW_LIBHDFS_DIR']

returns the path to libhdfs.so

/opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib64

I tried ctypes and still got the same error.

ArrowIOError: Unable to load libhdfs

Thank you again for all of your time

from dask-yarn.

wesm avatar wesm commented on August 13, 2024

Can you show the error when trying to load with ctypes? If you can't load with ctypes then anything related to pyarrow is unrelated

from dask-yarn.

wesm avatar wesm commented on August 13, 2024

Also we don't use the "hdfs://" in pyarrow, though we should detect and remove it automatically. I'll open a JIRA about that

from dask-yarn.

jcrist avatar jcrist commented on August 13, 2024

However when I tried to read the file using dask via:

Dask doesn't support file objects, just pass the path to dd.read_csv directly:

ddf = dd.read_csv('hdfs:///path/to/file.csv')

from dask-yarn.

sephib avatar sephib commented on August 13, 2024

Sorry - I wasn't clear.

import ctypes
cytpes.CDLL('/opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib64/libhdfs.so')

was successful and returned

<CDLL '/opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib64/libhdfs.so', handle 55965c0c0410 at 0xXXX>

but still got an error while trying to dd.read_csv()

After restarting YARN and my jupyter notebook i got an error trying to run cytpes.CDLL.

libjvm.so: cannot open shared object file: No such file or directory

after researching I found this post and saw that

ldd /opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib/hadoop/bin/fuse_dfs

results with

libjvm.so => not found
libhdfs.so.0.0.0 => not found

I update the libjvm.so link successfully

ln -s /usr/java/jdk1.8.0_162/jre/lib/amd64/server/libjvm.so /lib64/libjvm.so

and now the ctype runs successfully

however I failed to understand where to the libhdfs.so.0.0.0
tried to run

ln -s /opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib64/libhdfs.so.0.0.0 /lib64//libhdfs.so.0.0.0

but I'm still getting an error . I think i'm not linking it correctly.

any ideas?

still, when I try reading directly the hdfs file using dd.read_csv() I still get the error

ArrowIOError: Unable to load libhdfs

Thx again for your suggestions
Will need to try to look at it tomorrow

from dask-yarn.

sephib avatar sephib commented on August 13, 2024

Hi,
After consulting with Cloudera's support
I've added to all the nodes the following environment (in /etc/environment)
export LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH/lib64:$LD_LIBRARY_PATH
I also needed to update the link of the java in /opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib/hadoop/bin/fuse_dfs with

sudo ln -s /usr/java/jdk1.8.0_162/jre/lib/amd64/server/libjvm.so /lib64/libjvm.so

And now there are no missing links in fuse_dfs

ldd /opt/cloudera/parcels/CDH-5.14.4.-1.cdh5.14.4.p0.3/lib/hadoop/bin/fuse_dfs

Finally I'm not getting any errors concerning the libraries.

I checked my client and client.get_versions(check=True) - all looks good.
but when I execute

df = dd.read_csv('hdfs:///path/to/file.csv')
df.head()

I get the error below. Should I open a different issue for this error?


KilledWorker Traceback (most recent call last)
in ()
----> 1 df.head()

/opt/anaconda3/lib/python3.6/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
874
875 if compute:
--> 876 result = result.compute()
877 return result
878

/opt/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158

/opt/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
393 keys = [x.dask_keys() for x in collections]
394 postcomputes = [x.dask_postcompute() for x in collections]
--> 395 results = schedule(dsk, keys, **kwargs)
396 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
397

/opt/anaconda3/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, **kwargs)
2228 try:
2229 results = self.gather(packed, asynchronous=asynchronous,
-> 2230 direct=direct)
2231 finally:
2232 for f in futures.values():

/opt/anaconda3/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
1591 return self.sync(self._gather, futures, errors=errors,
1592 direct=direct, local_worker=local_worker,
-> 1593 asynchronous=asynchronous)
1594
1595 @gen.coroutine

/opt/anaconda3/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
645 return future
646 else:
--> 647 return sync(self.loop, func, *args, **kwargs)
648
649 def repr(self):

/opt/anaconda3/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
275 e.wait(10)
276 if error[0]:
--> 277 six.reraise(*error[0])
278 else:
279 return result[0]

/opt/anaconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.traceback is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None

/opt/anaconda3/lib/python3.6/site-packages/distributed/utils.py in f()
260 if timeout is not None:
261 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262 result[0] = yield future
263 except Exception as exc:
264 error[0] = sys.exc_info()

/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
1131
1132 try:
-> 1133 value = future.result()
1134 except Exception:
1135 self.had_exception = True

/opt/anaconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
1139 if exc_info is not None:
1140 try:
-> 1141 yielded = self.gen.throw(*exc_info)
1142 finally:
1143 # Break up a reference to itself

/opt/anaconda3/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1467 six.reraise(type(exception),
1468 exception,
-> 1469 traceback)
1470 if errors == 'skip':
1471 bad_keys.add(key)

/opt/anaconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.traceback is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None

KilledWorker: ("('from-delayed-pandas_read_text-read-block-head-1-5-from-delayed-6d4696b8c31047544bf639fdd8e724fb', 0)", 'tcp://10.241.11.15:36462')

from dask-yarn.

sephib avatar sephib commented on August 13, 2024

Hi,
BTW - I'm now able to read directly the csv on the hdfs using

import dask.dataframe as dd
df = dd.read_csv('hdfs:///path/to/file.csv', sep='|', encoding='1255')
df.head()

However when running df.head() with dask_yarn I'm getting the error

KilledWorker: ("('from-delayed-pandas_read_text-read-block-head-1-5-from-delayed-6d4696b8c31047544bf639fdd8e724fb', 0)", 'tcp://10.241.11.15:36462')

from dask-yarn.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.