Git Product home page Git Product logo

dask-cudf's Introduction

Dask GPU Dataframes

A partitioned gpu-backed dataframe, using Dask.

Setup from source

Setup from source repo:

  1. Install dependencies into a new conda environment where CUDA_VERSION is either 9.2 or 10

    conda create -n dask-cudf \
       -c rapidsai -c numba -c conda-forge -c defaults \
       cudf dask cudatoolkit={CUDA_VERSION}
    
  2. Activate conda environment:

    source activate dask-cudf
    
  3. Clone dask-cudf repo:

    git clone https://github.com/rapidsai/dask-cudf
    
  4. Install from source:

    cd dask-cudf
    pip install .
    

Test

  1. Install pytest

    conda install pytest
    
  2. Run all tests:

    py.test dask_cudf
    
  3. Or, run individual tests:

     py.test dask_cudf/tests/test_file.py
    

Style

For style we use black, isort, and flake8. These are available as pre-commit hooks that will run every time you are about to commit code.

From the root directory of this project run the following:

pip install pre-commit
pre-commit install

dask-cudf's People

Contributors

andersy005 avatar beckernick avatar dillon-cullinan avatar gputester avatar jcrist avatar jrhemstad avatar kkraus14 avatar mike-wendt avatar mrocklin avatar mtjrider avatar quasiben avatar randerzander avatar raydouglass avatar rjzamora avatar rlratzel avatar seibert avatar shwina avatar sklam avatar vibhujawa avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dask-cudf's Issues

max operation changes data in dataframe

I am trying to find out the max value in a column in the dask-cudf dataframe. Currently using the max operation alters the data in the dataframe which I observe by printing data in each rank. The following code reproduces the issue.

CODE :

import dask.dataframe as dd
import dask_cudf
from toolz import first, assoc
import cudf
from collections import defaultdict
from dask.distributed import Client, wait, default_client, futures_of
client = Client(scheduler_file = "/home/iroy/cluster.json", direct_to_workers = True)

def print_data_and_rank(data):
    from mpi4py import MPI
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    print(data)

def parse_host_port(address):
    if '://' in address:
        address = address.rsplit('://', 1)[1]
    host, port = address.split(':')
    port = int(port)
    return host, port

def max_operation(ddf):
    client = default_client()
    if isinstance(ddf, dd.DataFrame):
        # THE MAX OPERATION THAT CAUSES ISSUE
        max_node_src_col = ddf[ddf.columns[0]].max()
        parts = ddf.to_delayed()
        parts = client.compute(parts)
        wait(parts)
    key_to_part_dict = dict([(str(part.key), part) for part in parts])
    who_has = client.who_has(parts)
    worker_map = []
    for key, workers in who_has.items():
        print("workers")
        print(workers)
        worker = parse_host_port(first(workers))
        worker_map.append((worker, key_to_part_dict[key]))
    print("Printing data...")
    gpu_futures = [client.submit(print_data_and_rank, part, workers=[worker]) for worker, part in worker_map]


input_df = cudf.DataFrame({'a':[9,9,9,9],'b':[8,8,8,8]})

print(input_df)
ddf = dask_cudf.from_cudf(input_df, chunksize=1).persist()
print("DASK CUDF: ", ddf)

max_operation(ddf)

OUTPUT(4 partitions, 4 ranks):
a b
0 1 255
a b
1 1 8
a b
2 9 8
a b
3 9 8

TypeError: read_csv() got an unexpected keyword argument 'byte_range'

>>> from numpy import dtype
>>> from dask_cudf import read_csv
>>> with open('const.csv', 'w') as f:
...   f.write('1,2,3\n')
... 
6
>>> read_csv('const.csv', names=list('abc'), dtype=[dtype('i')]*3).compute()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/conda/lib/python3.7/site-packages/dask/base.py", line 156, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/conda/lib/python3.7/site-packages/dask/base.py", line 398, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/conda/lib/python3.7/site-packages/dask/local.py", line 500, in get_sync
    return get_async(apply_sync, 1, dsk, keys, **kwargs)
  File "/conda/lib/python3.7/site-packages/dask/local.py", line 446, in get_async
    fire_task()
  File "/conda/lib/python3.7/site-packages/dask/local.py", line 442, in fire_task
    callback=queue.put)
  File "/conda/lib/python3.7/site-packages/dask/local.py", line 489, in apply_sync
    res = func(*args, **kwds)
  File "/conda/lib/python3.7/site-packages/dask/local.py", line 235, in execute_task
    result = pack_exception(e, dumps)
  File "/conda/lib/python3.7/site-packages/dask/local.py", line 230, in execute_task
    result = _execute_task(task, data)
  File "/conda/lib/python3.7/site-packages/dask/core.py", line 119, in _execute_task
    return func(*args2)
  File "/conda/lib/python3.7/site-packages/dask/compatibility.py", line 93, in apply
    return func(*args, **kwargs)
TypeError: read_csv() got an unexpected keyword argument 'byte_range'

dask_cudf is master branch, cudf is branch-0.6. Also fails with cudf on master branch.

dask_cudf.from_cudf fails when npartitions set to zero

Repro:

>>> import dask_cudf, cudf
>>> dask_cudf.from_cudf(cudf.DataFrame(), npartitions=0)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf-0+unknown-py3.7.egg/dask_cudf/core.py", line 723, in from_cudf
    chunksize = int(ceil(nrows / npartitions))
ZeroDivisionError: division by zero

Use cudf byte ranges to build proper read_csv

Currently the read_csv function reads each file as a single dataframe. This can be a problem when we have larger files. Now that cudf supports byte ranges we should be able to read large files into many small blocks (see rapidsai/cudf#807)

This would allow for smoother operation on large files.

For each file we need to look at the size of the file, then create tasks to read each chunk of data from that file, being mindful of when to read and not read the header. We'll need to accept a blocksize= keyword as well (or whatever dask.dataframe.read_csv uses).

Eventually it would be nice to unify this code with dask.dataframe's implementation, but I suspect that that is too complex to get done in the near term, so a short term fix might be appropriate.

Repartition function not implemented

Should follow the dask.dataframe spec of being able to specify divisions or npartitions, and a flag for force which:
"Allows the expansion of the existing divisions. If False then the new divisions lower and upper bounds must be the same as the old divisions."

Unable to use "from_delayed" with cudf & dask_cudf

I'm trying to use dask_cudf to load multiple cuDFs at once, but getting an error about cudf not having an 'index' attribute. I suspect this is because the recent refactor moved index to cudf.dataframe.index.

Package Versions:

>>> dask.__version__
'0.19.2'
# installed with "python setup.py install" from dask-cudf master
>>> dask_cudf.__version__
'0.4.0+3.g60b43da.dirty'
# built from latest cudf branch-0.5
>>> cudf.__version__
'0.4.0+3.g60b43da.dirty'

Repro snippet:

lines = ["0,1,2", "3,4,5", "6,7,8"]

n_files = 2
for x in range(0, n_files):
    with open('/data/'+str(x)+'.csv', 'w') as fp:
        fp.write('\n'.join(lines) + '\n')
        
names = ["num0", "num1", "num2"]
dtypes = ["int" for name in names]

@delayed
def load_file(fn):
    return cudf.read_csv(fn, names=names, dtype=dtypes)

files = ['/data/'+str(x)+'.csv' for x in range(0, n_files)]
print(files)
delays = [load_file(file) for file in files]
out = dask_cudf.from_delayed(delays)

Result:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-56-1df52fc09247> in <module>
     16 print(files)
     17 delays = [load_file(file) for file in files]
---> 18 out = dask_cudf.from_delayed(delays)

/conda/envs/cudf/lib/python3.5/site-packages/dask_cudf-0.0.1+215.gdcfc89c-py3.5.egg/dask_cudf/core.py in from_delayed(dfs, meta, prefix)
   1103     if meta is None:
   1104         meta = dfs[0].compute()
-> 1105     meta = make_meta(meta)
   1106 
   1107     name = prefix + '-' + tokenize(*dfs)

/conda/envs/cudf/lib/python3.5/site-packages/dask_cudf-0.0.1+215.gdcfc89c-py3.5.egg/dask_cudf/utils.py in make_meta(x)
     30     if hasattr(x, '_meta'):
     31         return x._meta
---> 32     if isinstance(x, (gd.Series, gd.DataFrame, gd.index.Index)):
     33         out = x[:2]
     34         return out.copy() if hasattr(out, 'copy') else out

AttributeError: module 'cudf' has no attribute 'index'

read_csv fails when we land in the final line

# create file with 104 bytes
import pandas as pd
pd.DataFrame({'x': range(20), 'y': range(20)}).to_csv('myfile.csv', index=False)

# Read the file starting at the 100th byte
import cudf
cudf.read_csv('myfile.csv', byte_range=(100, 50), header=None, names=['x', 'y'])
ERROR:  8  in read_csv: no data available for data type inference
---------------------------------------------------------------------------
GDFError                                  Traceback (most recent call last)
<ipython-input-6-2d2321dea6dc> in <module>
----> 1 cudf.read_csv('myfile.csv', byte_range=(100, 50), header=None, names=['x', 'y'])

~/cudf/python/cudf/io/csv.py in read_csv(filepath_or_buffer, lineterminator, quotechar, quoting, doublequote, header, mangle_dupe_cols, usecols, sep, delimiter, delim_whitespace, skipinitialspace, names, dtype, skipfooter, skiprows, dayfirst, compression, thousands, decimal, true_values, false_values, nrows, byte_range, skip_blank_lines, comment, na_values, keep_default_na, na_filter, prefix, index_col)
    343
    344     # Call read_csv
--> 345     libgdf.read_csv(csv_reader)
    346
    347     out = csv_reader.data

~/miniconda/envs/cudf/lib/python3.7/site-packages/libgdf_cffi/wrapper.py in wrap(*args)
     25                     if errcode != self._api.GDF_SUCCESS:
     26                         errname, msg = self._get_error_msg(errcode)
---> 27                         raise GDFError(errname, msg)
     28
     29                 wrap.__name__ = fn.__name__

GDFError: GDF_INVALID_API_CALL

We've landed in the last line, so we don't have any data. Probably the right thing here is to create an empty dataframe. Unfortunately we also don't know the right dtypes to use, so read_csv borks.

In dask.dataframe we pass the dtypes, but that causes issues in dask-cudf so we don't do it.

Short term the solution here is probably to make our own chunk read_csv function that handles this case, and pass it dtypes.

def _read_csv(fn, dtypes=None, **kwargs):
    try:
        df = cudf.read_csv(fn, **kwargs)
    except ...:
        return cudf.DataFrame(..., dtypes=dtypes)

dsk = {(name, i): (_read_csv, fn, ..., dtypes=meta.dtypes) for i, ...}

cc @quasiben

[FEA] Support reading from cloud storage URLs

Dask can read from cloud storage & HDFS but reads into Pandas DataFrames. Today we can get that into dask_cudf with map_partitions and cudf.from_pandas, but that is slow and would be painful for users.

dask-cudf should support reading from S3, Google Cloud Storage, and Azure Data Lake Storage and passing directly into cudf file readers.

Series cast operations result in "ValueError: Length of values does not match index length"

Note: df['b'].astype('int') works when using cudf directly

Repro:

df = cudf.DataFrame()
df['a'] = [0, 1, 2, 3, 4]

dgd = dask_cudf.from_cudf(df, npartitions=1)
dgd['b'] = dgd['b'].astype('int')

Result:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-131-c6dfd269c825> in <module>
      4 dgd = dask_cudf.from_cudf(df, npartitions=1)
      5 #dgd['b'] = dgd['a']+1
----> 6 dgd['b'] = dgd['a'].astype(np.int32)

/conda/envs/cudf/lib/python3.7/site-packages/dask/dataframe/core.py in __setitem__(self, key, value)
   2521             df = self.assign(**{k: value for k in key})
   2522         else:
-> 2523             df = self.assign(**{key: value})
   2524 
   2525         self.dask = df.dask

/conda/envs/cudf/lib/python3.7/site-packages/dask/dataframe/core.py in assign(self, **kwargs)
   2718 
   2719         # Figure out columns of the output
-> 2720         df2 = self._meta.assign(**_extract_meta(kwargs))
   2721         return elemwise(methods.assign, self, *pairs, meta=df2)
   2722 

/conda/envs/cudf/lib/python3.7/site-packages/cudf-0.6.0.dev0+708.gc5b9d3e.dirty-py3.7-linux-x86_64.egg/cudf/dataframe/dataframe.py in assign(self, **kwargs)
    320         new = self.copy()
    321         for k, v in kwargs.items():
--> 322             new[k] = v
    323         return new
    324 

/conda/envs/cudf/lib/python3.7/site-packages/cudf-0.6.0.dev0+708.gc5b9d3e.dirty-py3.7-linux-x86_64.egg/cudf/dataframe/dataframe.py in __setitem__(self, name, col)
    272             self._cols[name] = self._prepare_series_for_add(col)
    273         else:
--> 274             self.add_column(name, col)
    275 
    276     def __delitem__(self, name):

/conda/envs/cudf/lib/python3.7/site-packages/cudf-0.6.0.dev0+708.gc5b9d3e.dirty-py3.7-linux-x86_64.egg/cudf/dataframe/dataframe.py in add_column(self, name, data, forceindex)
    835         if isinstance(data, GeneratorType):
    836             data = Series(data)
--> 837         series = self._prepare_series_for_add(data, forceindex=forceindex)
    838         series.name = name
    839         self._cols[name] = series

/conda/envs/cudf/lib/python3.7/site-packages/cudf-0.6.0.dev0+708.gc5b9d3e.dirty-py3.7-linux-x86_64.egg/cudf/dataframe/dataframe.py in _prepare_series_for_add(self, col, forceindex)
    807         """
    808         self._sanitize_columns(col)
--> 809         col = self._sanitize_values(col)
    810 
    811         empty_index = len(self._index) == 0

/conda/envs/cudf/lib/python3.7/site-packages/cudf-0.6.0.dev0+708.gc5b9d3e.dirty-py3.7-linux-x86_64.egg/cudf/dataframe/dataframe.py in _sanitize_values(self, col)
    791             return Series(arr)
    792         elif len(self) > 0 and len(sind) != len(index):
--> 793             raise ValueError('Length of values does not match index length')
    794         return col
    795 

ValueError: Length of values does not match index length

Inner join not implemented

Repro:

>>> import cudf, dask_cudf
>>> x = cudf.DataFrame({'a': [1, 2]})
>>> x
<cudf.DataFrame ncols=1 nrows=2 >
>>> y = dask_cudf.from_cudf(x, npartitions=1)
>>> y
<dask_cudf.DataFrame | 1 tasks | 1 npartitions>
>>> y.merge(y, on=('a',), how='inner')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf-0+unknown-py3.7.egg/dask_cudf/core.py", line 255, in merge
    assert how == "left", "left join is impelemented"
AssertionError: left join is impelemented

Unable to .head() a dask_cudf df created by from_delayed

Unfortunately the snippet below errors only on the specific weather dataset I'm using.

Will update this issue with a more minimal example as soon as possible, but since the error looks more like a library bug, I'm opening the issue now.

@delayed
def load_file(fn, names, dtypes, skiprows):
    return cudf.read_csv(fn, names=names, dtype=dtypes, skiprows=skiprows)

# From ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/
years = [2017, 2018]
files = ['/data/weather/'+str(year)+'.csv' for year in years]

# csv columns
names = ["id", "date", "type", "val", "time", "dummy1", "dummy2", "dummy3"]
dtypes = ["int" for name in names]
# use workers to read files
weather_df = dask_cudf.from_delayed([load_file(fn, names, dtypes, 1) for fn in files])
weather_df.head()

Result:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-16-043988ff80e1> in <module>
----> 1 weather_df.head()

/conda/envs/cudf/lib/python3.5/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
    874 
    875         if compute:
--> 876             result = result.compute()
    877         return result
    878 

/conda/envs/cudf/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

/conda/envs/cudf/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
    395     keys = [x.__dask_keys__() for x in collections]
    396     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 397     results = schedule(dsk, keys, **kwargs)
    398     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    399 

/conda/envs/cudf/lib/python3.5/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, **kwargs)
   2216             try:
   2217                 results = self.gather(packed, asynchronous=asynchronous,
-> 2218                                       direct=direct)
   2219             finally:
   2220                 for f in futures.values():

/conda/envs/cudf/lib/python3.5/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1579             return self.sync(self._gather, futures, errors=errors,
   1580                              direct=direct, local_worker=local_worker,
-> 1581                              asynchronous=asynchronous)
   1582 
   1583     @gen.coroutine

/conda/envs/cudf/lib/python3.5/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    645             return future
    646         else:
--> 647             return sync(self.loop, func, *args, **kwargs)
    648 
    649     def __repr__(self):

/conda/envs/cudf/lib/python3.5/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    275             e.wait(10)
    276     if error[0]:
--> 277         six.reraise(*error[0])
    278     else:
    279         return result[0]

/conda/envs/cudf/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/conda/envs/cudf/lib/python3.5/site-packages/distributed/utils.py in f()
    260             if timeout is not None:
    261                 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262             result[0] = yield future
    263         except Exception as exc:
    264             error[0] = sys.exc_info()

/conda/envs/cudf/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

/conda/envs/cudf/lib/python3.5/asyncio/futures.py in result(self)
    292             self._tb_logger = None
    293         if self._exception is not None:
--> 294             raise self._exception
    295         return self._result
    296 

/conda/envs/cudf/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

/conda/envs/cudf/lib/python3.5/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1455                             six.reraise(type(exception),
   1456                                         exception,
-> 1457                                         traceback)
   1458                     if errors == 'skip':
   1459                         bad_keys.add(key)

/conda/envs/cudf/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

/conda/envs/cudf/lib/python3.5/site-packages/dask/dataframe/utils.py in check_meta()
    489     if not isinstance(meta, parallel_types()):
    490         raise TypeError("Expected partition to be DataFrame, Series, or "
--> 491                         "Index, got `%s`" % type(meta).__name__)
    492 
    493     def is_dataframe_like(df):

TypeError: Expected partition to be DataFrame, Series, or Index, got `DataFrame`

[FEA] Implement fillna method for DataFrame class

Request

As a user, I may want to call fillna on an entire dataframe. Standard dask dataframes have this functionality, but dask_cudf dataframes currently do not.

Standard Dask Example

import cudf
import numpy as np
import pandas as pd
import dask_cudf
import dask.dataframe as dd


df = pd.DataFrame(
	{'a': [1,2,None],
	'b': [1,2,3],
	'c': [3,2,1]})

ddf = dd.from_pandas(df, npartitions=2)
ddf.fillna(999).compute()  
       a  b  c
0    1.0  1  3
1    2.0  2  2
2  999.0  3  1

Dask Cudf Example

import cudf
import numpy as np
import pandas as pd
import dask_cudf
import dask.dataframe as dd


df = pd.DataFrame(
	{'a': [1,2,None],
	'b': [1,2,3],
	'c': [3,2,1]})

gdf = cudf.DataFrame.from_pandas(df)

ddf = dask_cudf.from_cudf(gdf, npartitions=2)

ddf.fillna(999).compute() 

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-57-b633fe3e1345> in <module>
----> 1 ddf.fillna(999).compute()

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/dataframe/core.py in fillna(self, value, method, limit, axis)
   1011         else:
   1012             test_value = value
-> 1013         meta = self._meta_nonempty.fillna(value=test_value, method=method,
   1014                                           limit=limit, axis=axis)
   1015 

/conda/envs/cudf_dev/lib/python3.7/site-packages/cudf-0.6.0.dev0+758.g012045d-py3.7-linux-x86_64.egg/cudf/dataframe/dataframe.py in __getattr__(self, key)
    183             return self[key]
    184 
--> 185         raise AttributeError("'DataFrame' object has no attribute %r" % key)
    186 
    187     def __getitem__(self, arg):

AttributeError: 'DataFrame' object has no attribute 'fillna'

read_csv doesn't work with gzipped data

I'm uncertain whether this should be a dask-cudf or cudf issue:

dask-cudf's use of cudf's byte_range parameter prevents read_csv from working if the input files are gzipped:

ERROR:  8  in Cannot read compressed input when using the byte range parameter                                                               
distributed.worker - WARNING -  Compute Failed                                                            
Function:  execute_task                                                                                                                      
args:      ((<function local_shuffle at 0x7f10606f0048>, (<function apply at 0x7f10a7902ea0>, <function read_csv at 0x7f106d4759d8>, ['/data/
weather/1851.csv.gz'], {'names': ['id', 'date', 'type', 'val', 'time', 'dummy1', 'dummy2', 'dummy3'], 'usecols': ['id', 'date', 'type', 'val'
], 'byte_range': (0, 134217728)}), 321, ['id']))                                                                                             
kwargs:    {}                                                                                                             
Exception: Exception('GDF_INVALID_API_CALL')   

I propose either:

  1. Having dask-cudf or cudf decompress gzipped files before reading them
  2. Detecting that the input is gzipped, and not using the byte_range param when delegating the read to cudf

I prefer option 1, so that users still benefit from optimal chunksizes, but am ok with option 2 if that's significantly easier to do, given users can still manually repartition after read.

[BUG] Automatic decompression not working for file URLs

Reading uncompressed files works, but if the files are gzipped, read_csv throws unicode decode errors.

Repro:

test_df = dask_cudf.read_csv('s3://noaa-ghcn-pds/csv.gz/1798.csv.gz', storage_options={'anon': True, 'use_ssl': False})

Result:

---------------------------------------------------------------------------
UnicodeDecodeError                        Traceback (most recent call last)
<ipython-input-72-f53198255bd8> in <module>
----> 1 test_df = dask_cudf.read_csv('s3://noaa-ghcn-pds/csv.gz/1798.csv.gz', storage_options={'anon': True, 'use_ssl': False})

/conda/envs/rapids/lib/python3.6/site-packages/dask_cudf-0.0.1+255.ge3d3350-py3.6.egg/dask_cudf/io/csv.py in read_csv(path, chunksize, **kwargs)
     16     if "://" in str(path):
     17         func = make_reader(cudf.read_csv, "read_csv", "CSV")
---> 18         return func(path, blocksize=chunksize, **kwargs)
     19     else:
     20         return _internal_read_csv(path=path, chunksize=chunksize, **kwargs)

/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/io/csv.py in read(urlpath, blocksize, collection, lineterminator, compression, sample, enforce, assume_missing, storage_options, include_path_column, **kwargs)
    486                            storage_options=storage_options,
    487                            include_path_column=include_path_column,
--> 488                            **kwargs)
    489     read.__doc__ = READ_DOC_TEMPLATE.format(reader=reader_name,
    490                                             file_type=file_type)

/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/io/csv.py in read_pandas(reader, urlpath, blocksize, collection, lineterminator, compression, sample, enforce, assume_missing, storage_options, include_path_column, **kwargs)
    373 
    374     # Use sample to infer dtypes and check for presense of include_path_column
--> 375     head = reader(BytesIO(b_sample), **kwargs)
    376     if include_path_column and (include_path_column in head.columns):
    377         raise ValueError("Files already contain the column name: %s, so the "

/conda/envs/rapids/lib/python3.6/site-packages/cudf-0.6.0.dev0+1032.g69a29e88.dirty-py3.6-linux-x86_64.egg/cudf/io/csv.py in read_csv(filepath_or_buffer, lineterminator, quotechar, quoting, doublequote, header, mangle_dupe_cols, usecols, sep, delimiter, delim_whitespace, skipinitialspace, names, dtype, skipfooter, skiprows, dayfirst, compression, thousands, decimal, true_values, false_values, nrows, byte_range, skip_blank_lines, comment, na_values, keep_default_na, na_filter, prefix, index_col)
    350     for i in range(csv_reader.num_cols_out):
    351         newcol = Column.from_cffi_view(out[i])
--> 352         new_names.append(ffi.string(out[i].col_name).decode())
    353         if(newcol.dtype == np.dtype('datetime64[ms]')):
    354             outcols.append(newcol.view(DatetimeColumn, dtype='datetime64[ms]'))

UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte

[BUG] Concat appears to return None and also not modify existing objects

Description

As a user, I want to concatenate dask_cudf DataFrames in my workflows. dask_cudf.concat appears to fail with cudf DataFrames. Standard dask dataframes can be concatenated with dd.concat.

Standard Dask Example

import cudf
import numpy as np
import pandas as pd
import dask_cudf
import dask.dataframe as dd


pdf = pd.DataFrame(
	{'a': list(range(20)),
	'b': list(reversed(range(20))),
	'c': list(range(20))})

pdf2 = pd.DataFrame(
	{'a': list(range(20, 40)),
	'b': list(reversed(range(20, 40))),
	'c': list(range(20, 40))})

dpdf = dd.from_pandas(pdf, npartitions=2)
dpdf2 = dd.from_pandas(pdf2, npartitions=2)

dd.concat([dpdf, dpdf2], interleave_partitions=True).compute()
     a   b   c
0    0  19   0
1    1  18   1
2    2  17   2
3    3  16   3
4    4  15   4
5    5  14   5
6    6  13   6
7    7  12   7
8    8  11   8
9    9  10   9
0   20  39  20
1   21  38  21
2   22  37  22
3   23  36  23
4   24  35  24
5   25  34  25
6   26  33  26
7   27  32  27
8   28  31  28
9   29  30  29
10  10   9  10
11  11   8  11
12  12   7  12
13  13   6  13
14  14   5  14
15  15   4  15
16  16   3  16
17  17   2  17
18  18   1  18
19  19   0  19
10  30  29  30
11  31  28  31
12  32  27  32
13  33  26  33
14  34  25  34
15  35  24  35
16  36  23  36
17  37  22  37
18  38  21  38
19  39  20  39

Dask Cudf Example


import cudf
import numpy as np
import pandas as pd
import dask_cudf
import dask.dataframe as dd

df = cudf.DataFrame([('a', list(range(20))),
('b', list(reversed(range(20)))),
('c', list(range(20)))])

df2 = cudf.DataFrame([('a', list(range(20))),
('b', list(range(20))),
('c', list(range(20)))])

ddf = dask_cudf.from_cudf(df, npartitions=2)
ddf2 = dask_cudf.from_cudf(df2, npartitions=2)

# Both of the following return None, nor do they appear to 
# modify either object in place
dask_cudf.concat([ddf, ddf2], interleave_partitions=True)
dask_cudf.concat([ddf, ddf2], interleave_partitions=False)

# For the traceback
dask_cudf.concat([ddf, ddf2], interleave_partitions=True).compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-13-640496f0f12d> in <module>
----> 1 dask_cudf.concat([ddf, ddf2], interleave_partitions=True).compute()

AttributeError: 'NoneType' object has no attribute 'compute'

[BUG] dask_cudf.merge results in unexpected keyword argument 'suffixes'

import cudf, dask_cudf

df_a = cudf.DataFrame()
df_a['id'] = [0, 1, 2]
df_a['zeroes'] = [0, 0, 0]

df_b = cudf.DataFrame()
df_b['id'] = [1, 3]
df_b['ones'] = [1, 1]

ddf_a = dask_cudf.from_cudf(df_a, npartitions=1)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=1)

merged = ddf_a.merge(ddf_b, on='id', how='inner')

Result:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-10-ed0a7b936e81> in <module>
     12 ddf_b = dask_cudf.from_cudf(df_b, npartitions=1)
     13 
---> 14 merged = ddf_a.merge(ddf_b, on='id', how='inner')

/conda/envs/rapids/lib/python3.6/site-packages/dask_cudf-0.0.1+255.ge3d3350-py3.6.egg/dask_cudf/core.py in merge(self, other, on, how, left_index, right_index, suffixes)
    188                 how=how,
    189                 lsuffix=suffixes[0],
--> 190                 rsuffix=suffixes[1],
    191             )
    192 

/conda/envs/rapids/lib/python3.6/site-packages/dask_cudf-0.0.1+255.ge3d3350-py3.6.egg/dask_cudf/join_impl.py in join_frames(left, right, on, how, lsuffix, rsuffix)
     48     """
     49     empty_frame = left._meta.merge(
---> 50         right._meta, on=on, how=how, suffixes=(lsuffix, rsuffix)
     51     )
     52 

TypeError: merge() got an unexpected keyword argument 'suffixes'

apply()

Delegate to the PyGDF apply() function.

[BUG] Creating a Series after a Dataframe with `from_cudf` method corrupts the Dataframe memory

Describe the bug
Certain sequence of steps for creating a Series and DataFrame consecutively, using from_cudf method and then calculating with compute(), cause the memory of the first object created to be overwritten/corrupted with element of the second. Since this sequence of events sometimes happens when connecting dask to cuML in some scenarios, it causes issues that happen intermittently.

Steps/Code to reproduce bug
Minimal reproducer I have been able to create:

import cudf
import dask_cudf

from distributed import Client, LocalCluster
cluster = LocalCluster()

# This happens also with LocalCudaCluster:
# from dask_cuda import LocalCUDACluster
# cluster = LocalCUDACluster()

client = Client(cluster)

# Selecting different gpus: 
devs = [0, 1]
workers = list(client.has_what().keys())
worker_devs = workers[0:min(len(devs), len(workers))]

def set_visible(i, n):
    import os
    all_devices = list(range(n))
    vd = ",".join(map(str, all_devices[i:] + all_devices[:i]))
    print(vd)
    os.environ["CUDA_VISIBLE_DEVICES"] = vd
    
[client.submit(set_visible, dev, len(devs), workers = [worker]) for dev, worker in zip(devs, worker_devs)]

# Create dataframe and series consecutively
X = cudf.DataFrame()
X['a'] = [1.0, 2.0, 3.0, 4.0]  # insert column
X['b'] = [1.0, 2.0, 3.0, 4.0]  # insert column
X['c'] = [1.0, 2.0, 3.0, 4.0]  # insert column
X['d'] = [1.0, 2.0, 3.0, 4.0]  # insert column

y = cudf.Series([60.0, 61.0, 62.0, 63.0])

X_df = dask_cudf.from_cudf(X, chunksize=2).persist()
y_df = dask_cudf.from_cudf(y, chunksize=2).persist()

print(X_df.compute())

Output:

        a        b        c         d
0    60.0      0.0      0.0 1.26e-321
1    61.0   5e-324   1e-323    5e-324
2 1.5e-323   1e-323 1.5e-323    1e-323
3 1.5e-323 1.5e-323     13.0  1.5e-323

Expected behavior
Output should be:

      a     b     c     d
0  10.0  10.0  10.0  10.0
1  11.0  11.0  11.0  11.0
2  12.0  12.0  12.0  12.0
3  13.0  13.0  13.0  13.0

Environment details (please complete the following information):

  • Environment location: Bare metal

  • Method of cuDF install:
    from source, branch-0.6, latest commit 7275461dd73fd385886f94922a1d226f67504325 on Tue Feb 19 19:18:02 2019 -0500

  • Method of cuML install:
    from source, branch-0.6, latest commit 6bc1c18c98327274f05d1aaa68798d61a9d9e572 on Tue Feb 19 14:15:12 2019 -0500

  • Please run and attach the output of the cudf/print_env.sh script to gather relevant environment details

commit 7275461dd73fd385886f94922a1d226f67504325 (HEAD -> branch-0.6, origin/branch-0.6, origin/HEAD)
Merge: f2f44be7 9ef4536e
Author: Keith Kraus <[email protected]>
Date:   Tue Feb 19 19:18:02 2019 -0500

    Merge pull request #611 from eyalroz/minor-bug-fixes-4

    [REVIEW] Fix for issue #610: Renamed `gdf_reduce_optimal_output_size()`

***OS Information***
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=18.04
DISTRIB_CODENAME=bionic
DISTRIB_DESCRIPTION="Ubuntu 18.04.1 LTS"
NAME="Ubuntu"
VERSION="18.04.1 LTS (Bionic Beaver)"
ID=ubuntu
ID_LIKE=debian
PRETTY_NAME="Ubuntu 18.04.1 LTS"
VERSION_ID="18.04"
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
VERSION_CODENAME=bionic
UBUNTU_CODENAME=bionic
Linux gv100-1804 4.15.0-45-generic #48-Ubuntu SMP Tue Jan 29 16:28:13 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux

***GPU Information***
Wed Feb 20 10:54:24 2019
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 415.27       Driver Version: 415.27       CUDA Version: 10.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Quadro GV100        Off  | 00000000:17:00.0 Off |                  Off |
| 30%   42C    P2    26W / 250W |   7316MiB / 32478MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   1  Quadro GV100        Off  | 00000000:65:00.0  On |                  Off |
| 31%   43C    P2    29W / 250W |   1187MiB / 32477MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
|    0      2339      C   ...te/miniconda3/envs/cuml-0220/bin/python   429MiB |
|    0      2515      C   ...te/miniconda3/envs/cuml-0220/bin/python   429MiB |
|    0      2538      C   ...te/miniconda3/envs/cuml-0220/bin/python   429MiB |
|    0      2540      C   ...te/miniconda3/envs/cuml-0220/bin/python   429MiB |
|    0      2542      C   ...te/miniconda3/envs/cuml-0220/bin/python   429MiB |
|    0      3206      C   ...miniconda3/envs/mpicuml-0217/bin/python  3847MiB |
|    0      3601      C   ...te/miniconda3/envs/cuml-0220/bin/python   429MiB |
|    0      3624      C   ...te/miniconda3/envs/cuml-0220/bin/python   429MiB |
|    0     12821      C   ...miniconda3/envs/mpicuml-0217/bin/python   455MiB |
|    1      1052      G   /usr/lib/xorg/Xorg                            39MiB |
|    1      1104      G   /usr/bin/gnome-shell                          62MiB |
|    1      1358      G   /usr/lib/xorg/Xorg                           105MiB |
|    1      1500      G   /usr/bin/gnome-shell                         117MiB |
|    1      2544      C   ...te/miniconda3/envs/cuml-0220/bin/python   429MiB |
|    1      3626      C   ...te/miniconda3/envs/cuml-0220/bin/python   429MiB |
+-----------------------------------------------------------------------------+

***CPU***
Architecture:        x86_64
CPU op-mode(s):      32-bit, 64-bit
Byte Order:          Little Endian
CPU(s):              12
On-line CPU(s) list: 0-11
Thread(s) per core:  2
Core(s) per socket:  6
Socket(s):           1
NUMA node(s):        1
Vendor ID:           GenuineIntel
CPU family:          6
Model:               85
Model name:          Intel(R) Core(TM) i7-7800X CPU @ 3.50GHz
Stepping:            4
CPU MHz:             1199.998
CPU max MHz:         4000.0000
CPU min MHz:         1200.0000
BogoMIPS:            7000.00
Virtualization:      VT-x
L1d cache:           32K
L1i cache:           32K
L2 cache:            1024K
L3 cache:            8448K
NUMA node0 CPU(s):   0-11
Flags:               fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc art arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid dca sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm 3dnowprefetch cpuid_fault epb cat_l3 cdp_l3 invpcid_single pti mba ibrs ibpb stibp tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm cqm mpx rdt_a avx512f avx512dq rdseed adx smap clflushopt clwb intel_pt avx512cd avx512bw avx512vl xsaveopt xsavec xgetbv1 xsaves cqm_llc cqm_occup_llc cqm_mbm_total cqm_mbm_local dtherm ida arat pln pts hwp hwp_act_window hwp_epp hwp_pkg_req

***CMake***
/home/dante/miniconda3/envs/cuml-0220/bin/cmake
cmake version 3.13.4

CMake suite maintained and supported by Kitware (kitware.com/cmake).

***g++***
/usr/bin/g++
g++ (Ubuntu 7.3.0-27ubuntu1~18.04) 7.3.0
Copyright (C) 2017 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.


***nvcc***
/usr/local/cuda/bin/nvcc
nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2018 NVIDIA Corporation
Built on Sat_Aug_25_21:08:01_CDT_2018
Cuda compilation tools, release 10.0, V10.0.130

***Python***
/home/dante/miniconda3/envs/cuml-0220/bin/python
Python 3.7.1

***Environment Variables***
PATH                            : /Users/danteg/miniconda3/bin:/opt/local/bin:/opt/local/sbin:/home/dante/bin:/usr/local/bin:/home/dante/miniconda3/envs/cuml-0220/bin:/home/dante/miniconda3/condabin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/usr/local/cuda/bin
LD_LIBRARY_PATH                 : /usr/local/cuda/lib64:/usr/local/lib
NUMBAPRO_NVVM                   : /usr/local/cuda/nvvm/lib64/libnvvm.so
NUMBAPRO_LIBDEVICE              : /usr/local/cuda/nvvm/libdevice/
CONDA_PREFIX                    : /home/dante/miniconda3/envs/cuml-0220
PYTHON_PATH                     :

***conda packages***
/home/dante/miniconda3/condabin/conda
# packages in environment at /home/dante/miniconda3/envs/cuml-0220:
#
# Name                    Version                   Build  Channel
arrow-cpp                 0.12.0           py37h0e61e49_0    conda-forge
atomicwrites              1.3.0                      py_0    conda-forge
attrs                     18.2.0                     py_0    conda-forge
backcall                  0.1.0                      py_0    conda-forge
blas                      1.0                         mkl
bleach                    3.1.0                      py_0    conda-forge
bokeh                     1.0.4                 py37_1000    conda-forge
boost                     1.68.0          py37h8619c78_1001    conda-forge
boost-cpp                 1.68.0            h11c811c_1000    conda-forge
bzip2                     1.0.6             h14c3975_1002    conda-forge
ca-certificates           2018.11.29           ha4d7672_0    conda-forge
certifi                   2018.11.29            py37_1000    conda-forge
cffi                      1.12.1           py37h9745a5d_0    conda-forge
click                     7.0                        py_0    conda-forge
cloudpickle               0.7.0                      py_0    conda-forge
cmake                     3.13.4               h8d4ced6_0    conda-forge
cudf                      0.6.0.dev0+589.g7275461d          pypi_0    pypi
cuml                      0.5.1+201.g6bc1c18          pypi_0    pypi
curl                      7.64.0               h646f8bb_0    conda-forge
cython                    0.29.5           py37hf484d3e_0    conda-forge
cytoolz                   0.9.0.1         py37h14c3975_1001    conda-forge
dask                      1.1.1                      py_0    conda-forge
dask-core                 1.1.1                      py_0    conda-forge
dask-cuda                 0.0.0                    pypi_0    pypi
dask-cudf                 0.0.1+233.g9cc8074          pypi_0    pypi
decorator                 4.3.2                      py_0    conda-forge
distributed               1.25.3                   py37_0    conda-forge
entrypoints               0.3                   py37_1000    conda-forge
expat                     2.2.5             hf484d3e_1002    conda-forge
freetype                  2.9.1             h94bbf69_1005    conda-forge
heapdict                  1.0.0                 py37_1000    conda-forge
icu                       58.2              hf484d3e_1000    conda-forge
intel-openmp              2019.1                      144
ipykernel                 5.1.0           py37h24bf2e0_1002    conda-forge
ipython                   7.3.0            py37h24bf2e0_0    conda-forge
ipython_genutils          0.2.0                      py_1    conda-forge
jedi                      0.13.2                py37_1000    conda-forge
jinja2                    2.10                       py_1    conda-forge
jpeg                      9c                h14c3975_1001    conda-forge
jsonschema                3.0.0a3               py37_1000    conda-forge
jupyter_client            5.2.4                      py_1    conda-forge
jupyter_core              4.4.0                      py_0    conda-forge
jupyterlab                0.35.4                   py37_0    conda-forge
jupyterlab_server         0.2.0                      py_0    conda-forge
krb5                      1.16.3            hc83ff2d_1000    conda-forge
libcurl                   7.64.0               h01ee5af_0    conda-forge
libedit                   3.1.20170329      hf8c457e_1001    conda-forge
libffi                    3.2.1             hf484d3e_1005    conda-forge
libgcc-ng                 7.3.0                hdf63c60_0    conda-forge
libgdf-cffi               0.6.0                    pypi_0    pypi
libgfortran-ng            7.2.0                hdf63c60_3    conda-forge
libpng                    1.6.36            h84994c4_1000    conda-forge
libprotobuf               3.6.1             hdbcaa40_1000    conda-forge
librmm-cffi               0.5.0                    pypi_0    pypi
libsodium                 1.0.16            h14c3975_1001    conda-forge
libssh2                   1.8.0             h1ad7b7a_1003    conda-forge
libstdcxx-ng              7.3.0                hdf63c60_0    conda-forge
libtiff                   4.0.10            h648cc4a_1001    conda-forge
libuv                     1.26.0               h14c3975_0    conda-forge
llvmlite                  0.27.0           py37hf484d3e_0    numba
locket                    0.2.0                      py_2    conda-forge
markupsafe                1.1.0           py37h14c3975_1000    conda-forge
mistune                   0.8.4           py37h14c3975_1000    conda-forge
mkl                       2019.1                      144
mkl_fft                   1.0.10           py37h14c3975_1    conda-forge
mkl_random                1.0.2            py37h637b7d7_2    conda-forge
more-itertools            4.3.0                 py37_1000    conda-forge
msgpack-python            0.6.1            py37h6bb024c_0    conda-forge
nbconvert                 5.3.1                      py_1    conda-forge
nbformat                  4.4.0                      py_1    conda-forge
ncurses                   6.1               hf484d3e_1002    conda-forge
notebook                  5.7.4                 py37_1000    conda-forge
numba                     0.42.0          np115py37hf484d3e_0    numba
numpy                     1.15.4           py37h7e9f1db_0
numpy-base                1.15.4           py37hde5b4d6_0
nvstrings                 0.2.0            cuda9.2_py37_0    nvidia
olefile                   0.46                       py_0    conda-forge
openblas                  0.3.3             h9ac9557_1001    conda-forge
openssl                   1.0.2p            h14c3975_1002    conda-forge
packaging                 19.0                       py_0    conda-forge
pandas                    0.24.1           py37hf484d3e_0    conda-forge
pandoc                    2.6                           1    conda-forge
pandocfilters             1.4.2                      py_1    conda-forge
parquet-cpp               1.5.1                         4    conda-forge
parso                     0.3.4                      py_0    conda-forge
partd                     0.3.9                      py_0    conda-forge
pexpect                   4.6.0                 py37_1000    conda-forge
pickleshare               0.7.5                 py37_1000    conda-forge
pillow                    5.4.1           py37h00a061d_1000    conda-forge
pip                       19.0.2                   py37_0    conda-forge
pluggy                    0.8.1                      py_0    conda-forge
prometheus_client         0.6.0                      py_0    conda-forge
prompt_toolkit            2.0.9                      py_0    conda-forge
psutil                    5.5.1            py37h14c3975_0    conda-forge
ptyprocess                0.6.0                 py37_1000    conda-forge
py                        1.7.0                      py_0    conda-forge
pyarrow                   0.12.0           py37hbbcf98d_2    conda-forge
pycparser                 2.19                       py_0    conda-forge
pygments                  2.3.1                      py_0    conda-forge
pyparsing                 2.3.1                      py_0    conda-forge
pyrsistent                0.14.10          py37h14c3975_0    conda-forge
pytest                    4.3.0                    py37_0    conda-forge
python                    3.7.1             hd21baee_1001    conda-forge
python-dateutil           2.8.0                      py_0    conda-forge
pytz                      2018.9                     py_0    conda-forge
pyyaml                    3.13            py37h14c3975_1001    conda-forge
pyzmq                     18.0.0           py37h6afc9c9_0    conda-forge
readline                  7.0               hf8c457e_1001    conda-forge
rhash                     1.3.6             h14c3975_1001    conda-forge
scikit-learn              0.20.2           py37hd81dba3_0
scipy                     1.2.0            py37h7c811a0_0
send2trash                1.5.0                      py_0    conda-forge
setuptools                40.8.0                   py37_0    conda-forge
six                       1.12.0                py37_1000    conda-forge
sortedcontainers          2.1.0                      py_0    conda-forge
sqlite                    3.26.0            h67949de_1000    conda-forge
tblib                     1.3.2                      py_1    conda-forge
terminado                 0.8.1                 py37_1001    conda-forge
testpath                  0.4.2                 py37_1000    conda-forge
thrift-cpp                0.12.0            h23e226f_1001    conda-forge
tk                        8.6.9             h84994c4_1000    conda-forge
toolz                     0.9.0                      py_1    conda-forge
tornado                   5.1.1           py37h14c3975_1000    conda-forge
traitlets                 4.3.2                 py37_1000    conda-forge
wcwidth                   0.1.7                      py_1    conda-forge
webencodings              0.5.1                      py_1    conda-forge
wheel                     0.33.1                   py37_0    conda-forge
xz                        5.2.4             h14c3975_1001    conda-forge
yaml                      0.1.7             h14c3975_1001    conda-forge
zeromq                    4.2.5             hf484d3e_1006    conda-forge
zict                      0.1.3                      py_0    conda-forge
zlib                      1.2.11            h14c3975_1004    conda-forge

[FEA] Support Direct Boolean Indexing on Series

Request

Standard Dask Dataframes support direct Boolean indexing of both Series and DataFrames, but dask_cudf DataFrames currently only support Boolean indexing on DataFrames. As a user, I may want to directly index Series objects.

Standard Dask Example

import cudf
import numpy as np
import pandas as pd
import dask_cudf
import dask.dataframe as dd

df = pd.DataFrame(
	{'a': list(range(20)),
	'b': list(reversed(range(20))),
	'c': list(range(20))})

ddf = dd.from_pandas(df, npartitions=2)

ddf.a[ddf.a > 2]
Dask Series Structure:
npartitions=2
0     int64
10      ...
19      ...
Name: a, dtype: int64
Dask Name: index, 8 tasks

Dask_cudf Example

import cudf
import numpy as np
import pandas as pd
import dask_cudf

df = cudf.DataFrame([('a', list(range(20))),
('b', list(reversed(range(20)))),
('c', list(range(20)))])

ddf = dask_cudf.from_cudf(df, npartitions=2) 

print(ddf.a[ddf.a > 2].compute())
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-18-443886fd309d> in <module>
----> 1 print(ddf.a[ddf.a > 2].compute())

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/dataframe/core.py in __getitem__(self, key)
   2044             dsk = partitionwise_graph(operator.getitem, name, self, key)
   2045             graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self, key])
-> 2046             return Series(graph, name, self._meta, self.divisions)
   2047         raise NotImplementedError()
   2048 

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/dataframe/core.py in __init__(self, dsk, name, meta, divisions)
    255             raise TypeError("Expected meta to specify type {0}, got type "
    256                             "{1}".format(typename(self._partition_type),
--> 257                                          typename(type(meta))))
    258         self._meta = meta
    259         self.divisions = tuple(divisions)

TypeError: Expected meta to specify type pandas.core.series.Series, got type cudf.dataframe.series.Series

dask_cudf.from_cudf fails for empty cudf.DataFrame

Repro:

>>> import dask_cudf, cudf
>>> dask_cudf.from_cudf(cudf.DataFrame(), npartitions=0)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf-0+unknown-py3.7.egg/dask_cudf/core.py", line 723, in from_cudf
    chunksize = int(ceil(nrows / npartitions))
ZeroDivisionError: division by zero
>>> dask_cudf.from_cudf(cudf.DataFrame(), npartitions=1)
ERROR:Call to cuLaunchKernel results in CUDA_ERROR_INVALID_VALUE
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf-0+unknown-py3.7.egg/dask_cudf/core.py", line 728, in from_cudf
    data = data.sort_index(ascending=True)
  File "/conda/envs/rapids/lib/python3.7/site-packages/cudf-0+unknown-py3.7-linux-x86_64.egg/cudf/dataframe/dataframe.py", line 885, in sort_index
    return self._sort_by(self.index.argsort(ascending=ascending))
  File "/conda/envs/rapids/lib/python3.7/site-packages/cudf-0+unknown-py3.7-linux-x86_64.egg/cudf/dataframe/index.py", line 72, in argsort
    return self.as_column().argsort(ascending=ascending)
  File "/conda/envs/rapids/lib/python3.7/site-packages/cudf-0+unknown-py3.7-linux-x86_64.egg/cudf/dataframe/columnops.py", line 66, in argsort
    _, inds = self.sort_by_values(ascending=ascending)
  File "/conda/envs/rapids/lib/python3.7/site-packages/cudf-0+unknown-py3.7-linux-x86_64.egg/cudf/dataframe/numerical.py", line 138, in sort_by_values
    index=sort_inds.data.mem)
  File "/conda/envs/rapids/lib/python3.7/site-packages/cudf-0+unknown-py3.7-linux-x86_64.egg/cudf/utils/cudautils.py", line 333, in gather
    gpu_gather.forall(index.size)(data, index, out)
  File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/compiler.py", line 235, in __call__
    sharedmem=self.sharedmem)(*args)
  File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/compiler.py", line 497, in __call__
    sharedmem=self.sharedmem)
  File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/compiler.py", line 571, in _kernel_call
    cu_func(*kernelargs)
  File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/driver.py", line 1514, in __call__
    self.sharedmem, streamhandle, args)
  File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/driver.py", line 1558, in launch_kernel
    None)
  File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/driver.py", line 290, in safe_cuda_api_call
    self._check_error(fname, retcode)
  File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/driver.py", line 325, in _check_error
    raise CudaAPIError(retcode, msg)
numba.cuda.cudadrv.driver.CudaAPIError: [1] Call to cuLaunchKernel results in CUDA_ERROR_INVALID_VALUE

Testing / CI

I'm not sure I understand the current CI system. It looks like we're building conda packages.

Instead, I'm inclined to conda install dask and cudf and then run py.test, leaving package builds to release time.

Any objection to my tearing things down here?

cc @mike-wendt @kkraus14

dask_cudf.from_cudf fails for single row dataframes

Repro:

>>> import cudf, dask_cudf
>>> x = cudf.DataFrame({'a': [1]})
>>> x
<cudf.DataFrame ncols=1 nrows=1 >
>>> y = dask_cudf.from_cudf(x, npartitions=1)
>>> y
<dask_cudf.DataFrame | 0 tasks | 0 npartitions>

multi-gb joins result in hangs

While working with the GHCN weather dataset, I ran into a "ValueError: All series must be of same type" which only occurs when working with > 1ish GB of data. Will file in a separate issue.

The below issue occurs both when setting up with LocalCudaCluster (omitted for simplicity), and when using dask_cudf diretly, without cluster.

While trying to boil it down to a simpler repro, I run into hanging/restarting kernels:

import pandas as pd
import numpy as np
import dask_cudf as dgd
import cudf

nelem = 100000000

# generate 2.1, and 1.1 gb file, takes about 4 minutes
df_0 = pd.DataFrame({'key': range(0, nelem), 'zeros': np.zeros(nelem)})
df_0.to_csv('left.csv')
df_1 = pd.DataFrame({'key': range(0, int(nelem/2)), 'ones': np.ones(int(nelem/2))})
df_1.to_csv('right.csv')

# runs fast, no issue
left = cudf.read_csv('left.csv')
right = cudf.read_csv('right.csv')
joined = left.merge(right, on=['key'], how='outer')
joined.head().to_pandas()

# hangs, restarts Jupyter kernels
left = dgd.read_csv('left.csv')
right = dgd.read_csv('right.csv')
joined = left.merge(right, on=['key'], how='outer')
joined.head().to_pandas()

From Jupyter logs:

KernelRestarter: restarting kernel (1/5), keep random ports
kernel 27543bfb-967e-4a50-b77a-665ec0443502 restarted
kernel 27543bfb-967e-4a50-b77a-665ec0443502 restarted

dtype handing in CSV is incorrect

Data

a,b,c
1.2,2017-01-09 11:13:28,N
1.2,2017-01-09 11:13:28,N
1.2,2017-01-09 11:13:28,N

Code

import dask_cudf

gdf = dask_cudf.read_csv('dtype-test.csv', chunksize='30 B')
print(gdf.head().to_pandas())

Error

libgdf_cffi.wrapper.GDFError: GDF_UNSUPPORTED_DTYPE

I think this is also an error on the cudf side as cudf inspects the dtypes as [dtype('float64') dtype('<M8[ms]') dtype('int32')] which is incorrect.

One possible solution right now is to drop the dtype setting in csv.py

cc @mrocklin

dask_cudf.DataFrame fails to compute empty dataframe

Repro:

>>> y
<dask_cudf.DataFrame | 0 tasks | 0 npartitions>
>>> y.compute()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/conda/envs/rapids/lib/python3.7/site-packages/dask/base.py", line 156, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/conda/envs/rapids/lib/python3.7/site-packages/dask/base.py", line 398, in compute
    return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
  File "/conda/envs/rapids/lib/python3.7/site-packages/dask/base.py", line 398, in <listcomp>
    return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
  File "/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf-0+unknown-py3.7.egg/dask_cudf/core.py", line 45, in finalize
    return cudf.concat(results)
  File "/conda/envs/rapids/lib/python3.7/site-packages/cudf-0+unknown-py3.7-linux-x86_64.egg/cudf/multi.py", line 21, in concat
    raise ValueError("Need at least one object to concatenate")
ValueError: Need at least one object to concatenate

[FEA] Implement a Series value_counts function, equivalent to pandas/cudf

Request

As a user, I want to get the counts of each unique value in a given Series for potential distributional analyses and visualizations.

Standard Dask Example

import cudf
import numpy as np
import pandas as pd
import dask_cudf
import dask.dataframe as dd

df = pd.DataFrame(
	{'a': list(range(20)),
	'b': list(reversed(range(20))),
	'c': list(range(20))})

ddf = dd.from_pandas(df, npartitions=2)

ddf.a.value_counts().compute()
19    1
18    1
1     1
2     1
3     1
4     1
5     1
6     1
7     1
8     1
9     1
10    1
11    1
12    1
13    1
14    1
15    1
16    1
17    1
0     1
Name: a, dtype: int64

[BUG] Series fillna fails from unexpected keyword argument 'method'

Description

As a user, I may want to fill in missing values in a Series. Currently, filling in missing values in a dask_cudf Series appears to fail due to the method not having a method keyword argument. The equivalent functionality works on a standard Dask Series.

Standard Dask Example

import cudf
import numpy as np
import pandas as pd
import dask_cudf
import dask.dataframe as dd


df = pd.DataFrame(
	{'a': [1,2,None],
	'b': [1,2,3],
	'c': [3,2,1]})

ddf = dd.from_pandas(df, npartitions=2)
ddf.a.fillna(999).compute()   
0      1.0
1      2.0
2    999.0
Name: a, dtype: float64

Dask Cudf Example

import cudf
import numpy as np
import pandas as pd
import dask_cudf
import dask.dataframe as dd


df = pd.DataFrame(
	{'a': [1,2,None],
	'b': [1,2,3],
	'c': [3,2,1]})

gdf = cudf.DataFrame.from_pandas(df)

ddf = dask_cudf.from_cudf(gdf, npartitions=2)

ddf.a.fillna(999)  
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-59-856ed22bca39> in <module>
----> 1 ddf.a.fillna(999)

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/dataframe/core.py in fillna(self, value, method, limit, axis)
   1012             test_value = value
   1013         meta = self._meta_nonempty.fillna(value=test_value, method=method,
-> 1014                                           limit=limit, axis=axis)
   1015 
   1016         if axis == 1 or method is None:

TypeError: fillna() got an unexpected keyword argument 'method'

Environment Information

# packages in environment at /conda/envs/cudf_dev:
#
# Name                    Version                   Build  Channel
alabaster                 0.7.12                     py_0    conda-forge
arrow-cpp                 0.12.1           py37h0e61e49_0    conda-forge
asn1crypto                0.24.0                py37_1003    conda-forge
atomicwrites              1.3.0                      py_0    conda-forge
attrs                     18.2.0                     py_0    conda-forge
babel                     2.6.0                      py_1    conda-forge
backcall                  0.1.0                      py_0    conda-forge
blas                      1.0                         mkl  
bleach                    3.1.0                      py_0    conda-forge
boost                     1.68.0          py37h8619c78_1001    conda-forge
boost-cpp                 1.68.0            h11c811c_1000    conda-forge
bzip2                     1.0.6             h14c3975_1002    conda-forge
ca-certificates           2018.11.29           ha4d7672_0    conda-forge
certifi                   2018.11.29            py37_1000    conda-forge
cffi                      1.12.2           py37h9745a5d_0    conda-forge
chardet                   3.0.4                 py37_1003    conda-forge
click                     7.0                      pypi_0    pypi
cloudpickle               0.8.0                      py_0    conda-forge
cmake                     3.13.4               h8d4ced6_0    conda-forge
commonmark                0.8.1                      py_0    conda-forge
cryptography              2.5              py37hb7f436b_1    conda-forge
cudf                      0.6.0.dev0+758.g012045d          pypi_0    pypi
curl                      7.64.0               h646f8bb_0    conda-forge
cython                    0.29.5           py37hf484d3e_0    conda-forge
cytoolz                   0.9.0.1         py37h14c3975_1001    conda-forge
dask-core                 1.1.2                      py_0    conda-forge
dask-cudf                 0.0.1+251.gf4cc176          pypi_0    pypi
decorator                 4.3.2                      py_0    conda-forge
distributed               1.26.0                   py37_1    conda-forge
docutils                  0.14                  py37_1001    conda-forge
entrypoints               0.3                   py37_1000    conda-forge
expat                     2.2.5             hf484d3e_1002    conda-forge
future                    0.17.1                py37_1000    conda-forge
heapdict                  1.0.0                 py37_1000    conda-forge
icu                       58.2              hf484d3e_1000    conda-forge
idna                      2.8                   py37_1000    conda-forge
imagesize                 1.1.0                      py_0    conda-forge
intel-openmp              2019.1                      144  
ipykernel                 5.1.0           py37h24bf2e0_1002    conda-forge
ipython                   7.3.0            py37h24bf2e0_0    conda-forge
ipython_genutils          0.2.0                      py_1    conda-forge
jedi                      0.13.3                   py37_0    conda-forge
jinja2                    2.10                       py_1    conda-forge
jsonschema                3.0.0                    py37_0    conda-forge
jupyter_client            5.2.4                      py_1    conda-forge
jupyter_core              4.4.0                      py_0    conda-forge
krb5                      1.16.3            hc83ff2d_1000    conda-forge
libcurl                   7.64.0               h01ee5af_0    conda-forge
libedit                   3.1.20170329      hf8c457e_1001    conda-forge
libffi                    3.2.1             hf484d3e_1005    conda-forge
libgcc-ng                 7.3.0                hdf63c60_0    conda-forge
libgdf-cffi               0.6.0                    pypi_0    pypi
libgfortran-ng            7.2.0                hdf63c60_3    conda-forge
libprotobuf               3.6.1             hdbcaa40_1001    conda-forge
librmm-cffi               0.5.0                    pypi_0    pypi
libsodium                 1.0.16            h14c3975_1001    conda-forge
libssh2                   1.8.0             h1ad7b7a_1003    conda-forge
libstdcxx-ng              7.3.0                hdf63c60_0    conda-forge
libuv                     1.26.0               h14c3975_0    conda-forge
llvmlite                  0.27.0           py37hf484d3e_0    numba
markdown                  2.6.11                   pypi_0    pypi
markupsafe                1.1.1            py37h14c3975_0    conda-forge
mistune                   0.8.4           py37h14c3975_1000    conda-forge
mkl                       2019.1                      144  
mkl_fft                   1.0.10           py37h14c3975_1    conda-forge
mkl_random                1.0.2            py37h637b7d7_2    conda-forge
more-itertools            4.3.0                 py37_1000    conda-forge
msgpack-python            0.6.1            py37h6bb024c_0    conda-forge
nbconvert                 5.3.1                      py_1    conda-forge
nbformat                  4.4.0                      py_1    conda-forge
nbsphinx                  0.4.2                      py_0    conda-forge
ncurses                   6.1               hf484d3e_1002    conda-forge
notebook                  5.7.4                 py37_1000    conda-forge
numba                     0.42.0          np115py37hf484d3e_0    numba
numpy                     1.15.4           py37h7e9f1db_0  
numpy-base                1.15.4           py37hde5b4d6_0  
numpydoc                  0.8.0                      py_1    conda-forge
nvstrings                 0.2.0            cuda9.2_py37_0    nvidia
openssl                   1.0.2q               h14c3975_0    conda-forge
packaging                 19.0                       py_0    conda-forge
pandas                    0.24.1           py37hf484d3e_0    conda-forge
pandoc                    2.6                           1    conda-forge
pandocfilters             1.4.2                      py_1    conda-forge
parquet-cpp               1.5.1                         4    conda-forge
parso                     0.3.4                      py_0    conda-forge
pexpect                   4.6.0                 py37_1000    conda-forge
pickleshare               0.7.5                 py37_1000    conda-forge
pip                       19.0.3                   py37_0    conda-forge
pluggy                    0.9.0                      py_0    conda-forge
prometheus_client         0.6.0                      py_0    conda-forge
prompt_toolkit            2.0.9                      py_0    conda-forge
psutil                    5.5.1            py37h14c3975_0    conda-forge
ptyprocess                0.6.0                 py37_1000    conda-forge
py                        1.8.0                      py_0    conda-forge
pyarrow                   0.12.0           py37hbbcf98d_2    conda-forge
pycparser                 2.19                       py_0    conda-forge
pygments                  2.3.1                      py_0    conda-forge
pyopenssl                 19.0.0                   py37_0    conda-forge
pyparsing                 2.3.1                      py_0    conda-forge
pyrsistent                0.14.11          py37h14c3975_0    conda-forge
pysocks                   1.6.8                 py37_1002    conda-forge
pytest                    4.3.0                    py37_0    conda-forge
python                    3.7.1             hd21baee_1001    conda-forge
python-dateutil           2.8.0                      py_0    conda-forge
pytz                      2018.9                     py_0    conda-forge
pyyaml                    3.13            py37h14c3975_1001    conda-forge
pyzmq                     18.0.0           py37h0e1adb2_0    conda-forge
readline                  7.0               hf8c457e_1001    conda-forge
recommonmark              0.5.0                      py_0    conda-forge
requests                  2.21.0                py37_1000    conda-forge
rhash                     1.3.6             h14c3975_1001    conda-forge
send2trash                1.5.0                      py_0    conda-forge
setuptools                40.8.0                   py37_0    conda-forge
six                       1.12.0                py37_1000    conda-forge
snowballstemmer           1.2.1                      py_1    conda-forge
sortedcontainers          2.1.0                      py_0    conda-forge
sphinx                    1.8.4                    py37_0    conda-forge
sphinx-markdown-tables    0.0.9                    pypi_0    pypi
sphinx_rtd_theme          0.4.3                      py_0    conda-forge
sphinxcontrib-websupport  1.1.0                      py_1    conda-forge
sqlite                    3.26.0            h67949de_1000    conda-forge
tblib                     1.3.2                    pypi_0    pypi
terminado                 0.8.1                 py37_1001    conda-forge
testpath                  0.4.2                 py37_1000    conda-forge
thrift-cpp                0.12.0            h23e226f_1001    conda-forge
tk                        8.6.9             h84994c4_1000    conda-forge
toolz                     0.9.0                    pypi_0    pypi
tornado                   5.1.1           py37h14c3975_1000    conda-forge
traitlets                 4.3.2                 py37_1000    conda-forge
urllib3                   1.24.1                py37_1000    conda-forge
wcwidth                   0.1.7                      py_1    conda-forge
webencodings              0.5.1                      py_1    conda-forge
wheel                     0.33.1                   py37_0    conda-forge
xz                        5.2.4             h14c3975_1001    conda-forge
yaml                      0.1.7             h14c3975_1001    conda-forge
zeromq                    4.2.5             hf484d3e_1006    conda-forge
zict                      0.1.3                    pypi_0    pypi
zlib                      1.2.11            h14c3975_1004    conda-forge

[BUG] loc fails in dask dataframes constructed from cudf

Description

loc selector fails in dask_cudf dataframes created from cudf dataframes, seemingly due to the GenericIndex not being an instance of pd.Index.

Expected Behavior

As a user, I expect loc selection to succeed if I create a dask_cudf dataframe from cudf or pandas.

Example

The follow example going from pandas succeeds.

import numpy as np
import pandas as pd
import cudf
import dask_cudf

df = pd.DataFrame({"A": [1., 2, 3], "B": [3., 4, 5]})
ddf = dd.from_pandas(df, npartitions=2)
ddf

type(ddf.index)
# dask.dataframe.core.Index
print(ddf.index.compute())
# RangeIndex(start=0, stop=3, step=1)
 
print(ddf.loc[1:2, ['A']]) 
Dask DataFrame Structure:
                     A
npartitions=1         
1              float64
2                  ...
Dask Name: loc, 2 tasks

This example, going to dask_cudf from a cudf dataframe, fails, seemingly due to GenericIndex not being a instance of a pandas Index.

import numpy as np
import pandas as pd
import cudf
import dask_cudf

df = cudf.DataFrame([('a', list(range(20))),
('b', list(reversed(range(20)))),
('c', list(range(20)))])

ddf = dask_cudf.from_cudf(df, npartitions=2) 

type(ddf.index)
# dask_cudf.core.Index
print(ddf.index.compute())
# GenericIndex([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int64) # This is different
 
print(ddf.loc[1:2, ['a']]) # fails
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-121-24ef766063ef> in <module>
     13 # GenericIndex([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int64) # This is different
     14 
---> 15 print(ddf.loc[1:2, ['a']]) # fails

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/dataframe/indexing.py in __getitem__(self, key)
     89             iindexer = key
     90             cindexer = None
---> 91         return self._loc(iindexer, cindexer)
     92 
     93     def _loc(self, iindexer, cindexer):

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/dataframe/indexing.py in _loc(self, iindexer, cindexer)
     97 
     98         if self.obj.known_divisions:
---> 99             iindexer = self._maybe_partial_time_string(iindexer)
    100 
    101             if isinstance(iindexer, slice):

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/dataframe/indexing.py in _maybe_partial_time_string(self, iindexer)
    125         """
    126         iindexer = _maybe_partial_time_string(self.obj._meta_nonempty.index,
--> 127                                               iindexer, kind='loc')
    128         return iindexer
    129 

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/dataframe/indexing.py in _maybe_partial_time_string(index, indexer, kind)
    302     """
    303     # do not pass dd.Index
--> 304     assert isinstance(index, pd.Index)
    305 
    306     if not isinstance(index, (pd.DatetimeIndex, pd.PeriodIndex)):

AssertionError: 

The analogous example, coming from pandas via cudf, works:

import numpy as np
import pandas as pd
import cudf
import dask_cudf


df = cudf.DataFrame([('a', list(range(20))),
('b', list(reversed(range(20)))),
('c', list(range(20)))])

pdf = df.to_pandas()
dask_df = dask_cudf.from_cudf(pdf, npartitions=2)    

type(dask_df.index)
# dask.dataframe.core.Index
print(dask_df.index.compute())
# RangeIndex(start=0, stop=4, step=1)

print(dask_df.loc[1:2, ['a']]) # works
RangeIndex(start=0, stop=20, step=1)
Dask DataFrame Structure:
                   a
npartitions=1       
1              int64
2                ...
Dask Name: loc, 3 tasks

Unable to import dask_cudf

import dask_cudf

Result:

---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
<ipython-input-2-08547766f3f6> in <module>
      3 import dask.dataframe as dd
      4 import cudf
----> 5 import dask_cudf
      6 

/conda/envs/cudf/lib/python3.5/site-packages/dask_cudf-0.0.1+224.ga3cae03-py3.5.egg/dask_cudf/__init__.py in <module>
      7     from_delayed,
      8 )
----> 9 from .io import read_csv
     10 
     11 from cudf._version import get_versions

ImportError: No module named 'dask_cudf.io'

dask-cudf was installed from master with the tip of cudf branch-0.5.

Additional question: Is dask-cudf dev working against cudf master, or branch-0.5?

[BUG] Groupby aggregations on multiple columns fail with second column not found

Description

As a user, I want to compute hierarchical aggregations by grouping on multiple columns. Passing a list of column names to the groupby API causes a column not found error, while grouping by each column individually succeeds. The second column in the list is the one that isn't found.

Standard Dask Example (Expected Result)

import cudf
import numpy as np
import pandas as pd
import dask_cudf
import dask.dataframe as dd

df = cudf.DataFrame([('a', list(range(20))),
('b', list(reversed(range(20)))),
('c', list(range(20)))])

df['agg_col1'] = [1 if x % 2 == 0 else 0 for x in range(len(df))]
df['agg_col2'] = [1 if x % 3 == 0 else 0 for x in range(len(df))]

pdf = df.to_pandas()
pddf = dd.from_pandas(pdf, npartitions=2)

pddf.groupby(['agg_col1', 'agg_col2']).sum().compute()
                    a   b   c
agg_col1 agg_col2            
0        0         73  60  73
         1         27  30  27
1        0         54  60  54
         1         36  40  36

Dask Cudf Example

import cudf
import numpy as np
import pandas as pd
import dask_cudf
import dask.dataframe as dd

df = cudf.DataFrame([('a', list(range(20))),
('b', list(reversed(range(20)))),
('c', list(range(20)))])

df['agg_col1'] = [1 if x % 2 == 0 else 0 for x in range(len(df))]
df['agg_col2'] = [1 if x % 3 == 0 else 0 for x in range(len(df))]

ddf = dask_cudf.from_cudf(df, npartitions=2)

print(ddf.groupby('agg_col1').sum().compute())                                                                                    
     a    b    c  agg_col2
0  100   90  100         3
1   90  100   90         4

print(ddf.groupby('agg_col2').sum().compute())                                                                                    
     a    b    c  agg_col1
0  127  120  127         6
1   63   70   63         4

print(ddf.groupby(['agg_col1', 'agg_col2']).sum().compute())
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-61-8baea99ae3bc> in <module>
     14 ddf = dask_cudf.from_cudf(df, npartitions=2)
     15 
---> 16 print(ddf.groupby(['agg_col1', 'agg_col2']).sum().compute())

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    396     keys = [x.__dask_keys__() for x in collections]
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398     results = schedule(dsk, keys, **kwargs)
    399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
    499     """
    500     kwargs.pop('num_workers', None)    # if num_workers present, remove it
--> 501     return get_async(apply_sync, 1, dsk, keys, **kwargs)
    502 
    503 

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    445             # Seed initial tasks into the thread pool
    446             while state['ready'] and len(state['running']) < num_workers:
--> 447                 fire_task()
    448 
    449             # Main loop, wait on tasks to finish, insert new ones

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in fire_task()
    441                             args=(key, dumps((dsk[key], data)),
    442                                   dumps, loads, get_id, pack_exception),
--> 443                             callback=queue.put)
    444 
    445             # Seed initial tasks into the thread pool

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in apply_sync(func, args, kwds, callback)
    488 def apply_sync(func, args=(), kwds={}, callback=None):
    489     """ A naive synchronous version of apply_async """
--> 490     res = func(*args, **kwds)
    491     if callback is not None:
    492         callback(res)

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    233         failed = False
    234     except BaseException as e:
--> 235         result = pack_exception(e, dumps)
    236         failed = True
    237     return key, result, failed

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    228     try:
    229         task, data = loads(task_info)
--> 230         result = _execute_task(task, data)
    231         id = get_id()
    232         result = dumps((result, id))

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/compatibility.py in apply(func, args, kwargs)
     91     def apply(func, args, kwargs=None):
     92         if kwargs:
---> 93             return func(*args, **kwargs)
     94         else:
     95             return func(*args)

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/dataframe/groupby.py in _apply_chunk(df, *index, **kwargs)
    235         if isinstance(columns, (tuple, list, set, pd.Index)):
    236             columns = list(columns)
--> 237         return func(g[columns])
    238 
    239 

/conda/envs/cudf_dev/lib/python3.7/site-packages/cudf-0.6.0.dev0+803.gde773d4-py3.7-linux-x86_64.egg/cudf/groupby/groupby.py in __getitem__(self, arg)
    286             for val in arg:
    287                 if val not in self._val_columns:
--> 288                     raise KeyError("Column not found: " + str(val))
    289         result = self.copy()
    290         result._val_columns = arg

KeyError: 'Column not found: agg_col2'

Obstacles to code reuse between Dask Dataframe and Dask-GDF

Considering issues in #40 and #39 I'm curious what stops us from using existing code in dask.dataframe. I can think of a few possible answers:

  1. Dask dataframe functions call pandas methods explicitly, like pandas.concat, which would coerce data to host memory
  2. Dask GDF uses different metadata than Dask Dataframe (maybe divisions and such are different) and so the old logic won't apply

Are there other things that I'm missing? If the only issue is the use of pandas functions then I wonder if we might make some of that pluggable and reuse some logic across projects.

Any thoughts @kkraus14 @sklam ?

filter()

Delegate to the PyGDF filter() function.

Creating dask_cudf df w/ from_delayed is missing data

Possibly related to #62

Create two dummy CSV files and read them into a dask_cudf:

files = ['test1.csv', 'test2.csv']
lines = ['num0,num1', '0,1', '2,3']

for file in files:
    with open(file, 'w') as fp:
        fp.write('\n'.join(lines)+ '\n')

@delayed
def load_file(fn, names, dtypes, skiprows):
    return cudf.read_csv(fn, names=names, dtype=dtypes, skiprows=skiprows)

# csv columns
names = ["num0", "num1"]
dtypes = ["int" for name in names]
df = dask_cudf.from_delayed([load_file(fn, names, dtypes, 1) for fn in files])
print(df.head())

Result:

  num0 num1
0    0    1
1    2    3

Expected Result:

  num0 num1
0    0    1
1    2    3
2    0    1
3    2    3

Add read_csv

I plan to add a read_csv function that just uses glob and passes a bunch of filenames to cudf.read_csv without per-file chunking. We'll need to separately figure out how best to handle cutting up byte ranges for larger files on the device.

GPU topology aware IPC

Dask_gdf has a special mechanism for moving dataframes between dask workers on the same node that have been assigned different GPUs. Unfortunately, this mechanism assumes peer access between all GPUs on the node, which is not always possible.

For example, the NVLINK connections on the DGX-1 create two fully connected GPU groups: 0-3 and 4-7. In addition, ever GPU is linked to one other GPU in a different cluster. This creates an error when a CUDA context on GPU 0 tries to open a CUDA IPC memory handle from a context on GPU 7 (for example). There's no easy way from user code to discover this situation in advance, and no easy way to workaround it. Do we bounce the data through host memory? Is there an intermediate GPU that has peer access to both the source and destination GPU that can facilitate the copy?

Ideally, we would like an abstraction for interprocess device-to-device CUDA memory copy that would work regardless of the underlying bus topology and would use the best approach automatically.

Indexed joins hang

I'm attempting to do a join along DataFrame indices.

Following repro is based on https://gist.github.com/mrocklin/6e2c33c33b32bc324e3965212f202f66

The snippet below hangs on the last line, and there is no activity in the Dask "status" page.

from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import dask.array as da
import dask.dataframe as dd
import dask
import cudf

ip = '0.0.0.0'
cluster = LocalCUDACluster(ip=ip)
client = Client(cluster)

n_rows = 5000000
n_keys = 5000000

left = dd.concat([
    da.random.random(n_rows).to_dask_dataframe(columns='x'),
    da.random.randint(0, n_keys, size=n_rows).to_dask_dataframe(columns='id'),
], axis=1).persist()

n_rows = 10000000
right = dd.concat([
    da.random.random(n_rows).to_dask_dataframe(columns='y'),
    da.random.randint(0, n_keys, size=n_rows).to_dask_dataframe(columns='id'),
], axis=1).persist()

gleft = left.map_partitions(cudf.from_pandas)
gright = right.map_partitions(cudf.from_pandas)

gleft, gright = dask.persist(gleft, gright) 
left_gdf = gleft.set_index('id')
right_gdf = gright.set_index('id')
joined_gdf = left_gdf.merge(right_gdf)

Edit:
It looks like the workers eventually die:

distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://172.17.0.2:53188 remote=tcp://172.17.0.2:38627>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://172.17.0.2:53190 remote=tcp://172.17.0.2:38627>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://172.17.0.2:53192 remote=tcp://172.17.0.2:38627>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://172.17.0.2:53194 remote=tcp://172.17.0.2:38627>

Import Error: AttributeError: type object 'Series' has no attribute '_bind_operator'

Full error below:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-11-039505fcfd3b> in <module>()
----> 1 import dask_gdf as dgd

~/Miniconda3/envs/gpu_accel_ml/lib/python3.6/site-packages/dask_gdf/__init__.py in <module>()
----> 1 from .core import from_pygdf, from_dask_dataframe, concat, from_delayed
      2 
      3 from ._version import get_versions
      4 __version__ = get_versions()['version']
      5 del get_versions

~/Miniconda3/envs/gpu_accel_ml/lib/python3.6/site-packages/dask_gdf/core.py in <module>()
    581            operator.lt, operator.le, operator.mod, operator.mul, operator.ne,
    582            operator.sub, operator.truediv, operator.floordiv]:
--> 583     Series._bind_operator(op)
    584 
    585 

AttributeError: type object 'Series' has no attribute '_bind_operator'

[FEA] Selection by position, like numpy/pandas/cudf

Request

As a user, I would like to be able to use positional indexing to select rows, ideally with the iloc accessor.

Currently, the iloc accessor only supports selecting full columns (via column-based positional indexing).

Dask_cudf Behavior Selecting Specific Rows

import cudf
import numpy as np
import pandas as pd
import dask_cudf

df = cudf.DataFrame([('a', list(range(20))),
('b', list(reversed(range(20)))),
('c', list(range(20)))])

pdf = df.to_pandas()
pdf = pdf.astype(np.float32)

dask_df = dask_cudf.from_cudf(pdf, npartitions=2)   

dask_df.iloc[:3, [0, 1]].compute()                                                                                               
---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
<ipython-input-184-9543e24dd580> in <module>
----> 1 dask_df.iloc[:5, [2, 0]].compute()

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/dataframe/indexing.py in __getitem__(self, key)
     56 
     57         if iindexer != slice(None):
---> 58             raise NotImplementedError(msg)
     59 
     60         return self._iloc(iindexer, cindexer)

NotImplementedError: 'DataFrame.iloc' only supports selecting columns. It must be used like 'df.iloc[:, column_indexer]'.             

Example Pandas Behavior

import cudf
import pandas as pd
import numpy as np
import dask_cudf

df = cudf.DataFrame([('a', list(range(20))),
('b', list(reversed(range(20)))),
('c', list(range(20)))])

pdf = df.to_pandas()
pdf = pdf.astype(np.float32)

print(pdf.iloc[:3, [0, 1]])                                                                                                      
     a     b
0  0.0  19.0
1  1.0  18.0
2  2.0  17.0

# could also select rows of all columns directly
print(pdf[:3])
     a     b    c
0  0.0  19.0  0.0
1  1.0  18.0  1.0
2  2.0  17.0  2.0

Joins Segfault

Currently our joins segfault. This is probably most easily reproduced by running the current test suite.

I apply this patch to un-skip the tests

diff --git a/dask_cudf/tests/test_join.py b/dask_cudf/tests/test_join.py
index 828f73e..63b601c 100644
--- a/dask_cudf/tests/test_join.py
+++ b/dask_cudf/tests/test_join.py
@@ -10,7 +10,6 @@ import dask_cudf as dgd
 param_nrows = [5, 10, 50, 100]


-@pytest.mark.skip(reason="Join implementation not updated")
 @pytest.mark.parametrize("left_nrows", param_nrows)
 @pytest.mark.parametrize("right_nrows", param_nrows)
 @pytest.mark.parametrize("left_nkeys", [4, 5])

Then I run tests

mrocklin@dgx16:~/dask-cudf$ py.test dask_cudf/tests/test_join.py --verbose
========================================== test session starts ==========================================
platform linux -- Python 3.6.7, pytest-4.0.1, py-1.7.0, pluggy-0.8.0 -- /home/nfs/mrocklin/miniconda/bin/python
cachedir: .pytest_cache
rootdir: /home/nfs/mrocklin/dask-cudf, inifile:
collected 260 items

dask_cudf/tests/test_join.py::test_join_inner[4-4-5-5] Segmentation fault

[FEA] Support local variables in query API

Description

As a user, I will want to pass local variables to query dask_cudf DataFrames. This is particularly useful for timestamp and datetime based querying. Am I able to do this with the local_dict keyword argument in standard dask, but it's currently not implemented in dask_cudf.

Standard Dask Example

import cudf
import numpy as np
import pandas as pd
import dask_cudf
import dask.dataframe as dd
import datetime as dt

np.random.seed(12)

date_df = cudf.DataFrame()
date_df['date'] = pd.date_range('11/20/2018', periods=72, freq='D')
date_df['value'] = np.random.sample(len(date_df))

pdf = date_df.to_pandas()
pddf = dd.from_pandas(pdf, npartitions=2)

search_date = dt.datetime.strptime('2018-11-23', '%Y-%m-%d')

pddf.query('date <= @search_date', local_dict={'search_date':search_date}).compute()
        date     value
0 2018-11-20  0.154163
1 2018-11-21  0.740050
2 2018-11-22  0.263315
3 2018-11-23  0.533739

Dask Cudf Example

Passing a local_dict results in a keyword argument error:

import cudf
import numpy as np
import pandas as pd
import dask_cudf
import dask.dataframe as dd
import datetime as dt

np.random.seed(12)
date_df = cudf.DataFrame()
date_df['date'] = pd.date_range('11/20/2018', periods=72, freq='D')
date_df['value'] = np.random.sample(len(date_df))

date_ddf = dask_cudf.from_cudf(date_df, npartitions=2)

search_date = dt.datetime.strptime('2018-11-23', '%Y-%m-%d')

print(date_ddf.query('date <= @search_date', local_dict={'search_date':search_date}).compute())
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-22-7a40ec2e6f3a> in <module>
     15 search_date = dt.datetime.strptime('2018-11-23', '%Y-%m-%d')
     16 
---> 17 print(date_ddf.query('date <= @search_date', local_dict={'search_date':search_date}).compute())

TypeError: query() got an unexpected keyword argument 'local_dict'

But, not passing a local_dict gives a local variable not implemented exception.

import cudf
import numpy as np
import pandas as pd
import dask_cudf
import dask.dataframe as dd
import datetime as dt

np.random.seed(12)
date_df = cudf.DataFrame()
date_df['date'] = pd.date_range('11/20/2018', periods=72, freq='D')
date_df['value'] = np.random.sample(len(date_df))

date_ddf = dask_cudf.from_cudf(date_df, npartitions=2)
search_date = dt.datetime.strptime('2018-11-23', '%Y-%m-%d')

print(date_ddf.query('date <= @search_date'))
---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
<ipython-input-12-fee85ff69cd6> in <module>
     14 search_date = dt.datetime.strptime('2018-11-23', '%Y-%m-%d')
     15 
---> 16 print(date_ddf.query('date <= @search_date'))

/conda/envs/cudf_dev/lib/python3.7/site-packages/dask_cudf-0.0.1+252.g0f86985-py3.7.egg/dask_cudf/core.py in query(self, expr)
    237         """
    238         if "@" in expr:
--> 239             raise NotImplementedError("Using variables from the calling " "environment")
    240         # Empty calling environment
    241         callenv = {"locals": {}, "globals": {}}

NotImplementedError: Using variables from the calling environment

dask-cudf chunks do not appear to follow one-process-per-GPU semantics

I am having trouble producing OPG in dask-cudf. I am running a test notebook and all of the chunks in my dask-cudf appear to be staying pinned to the same device. I am likely doing something incorrectly; maybe in the creation of the dask-cudf dataframe? This is important since dask-cuml is going to be processing the dask-cudf downstream and we will need to be able to test this end-to-end.

import cudf
import dask_cudf

from dask.distributed import Client
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster(threads_per_worker = 10)
client = Client(cluster)

X_df = dask_cudf.from_cudf(cudf.DataFrame([('a', [0, 1, 2, 3, 4])]), chunksize=1).persist()
client.who_has()

import numba.cuda
import cuml
def print_device(arr):
    print(str(numba.cuda.get_current_device()))
    print(str(cuml.device_of_ptr(arr.as_gpu_matrix(order="F"))))
    
[client.submit(print_device, part) for part in X_df.to_delayed()]

The following is printed in my notebook:

{"('from_cudf-168a30423f834eb583d05f58476a4a21', 0)": ('tcp://127.0.0.1:40458',),
 "('from_cudf-168a30423f834eb583d05f58476a4a21', 1)": ('tcp://127.0.0.1:45874',),
 "('from_cudf-168a30423f834eb583d05f58476a4a21', 2)": ('tcp://127.0.0.1:40458',),
 "('from_cudf-168a30423f834eb583d05f58476a4a21', 3)": ('tcp://127.0.0.1:45874',)}

[<Future: status: pending, key: print_device-04d74ae726f4b1b69ed002d269adcc39>,
 <Future: status: pending, key: print_device-0e818733563b55b50320f5092e702aad>,
 <Future: status: pending, key: print_device-9554966353a4b87a1eaecc7981789320>,
 <Future: status: pending, key: print_device-ceb77fd940e635c284f907f0980c913f>]

And the following is printed on my workers:

<CUDA device 0 'b'Quadro GV100''>
<CUDA device 0 'b'Quadro GV100''>
<CUDA device 0 'b'Quadro GV100''>
0
0
<CUDA device 0 'b'Quadro GV100''>
0
0

nvidia-smi does show that my workers are all on device 0:

|    0     29507      C   /share/conda/cuml/bin/python                 447MiB |
|    0     29537      C   /share/conda/cuml/bin/python                 447MiB |
|    0     29538      C   /share/conda/cuml/bin/python                 447MiB |

Another test I did was running my own dask-mpi cluster and manually setting the CUDA_VISIBLE_DEVICES on each worker before pushing any data. In the case of this test, while I did notice that nvidia-smi showed the workers spanning the two devices in my system, all of the cudf chunks in the dask Dataframe were still pinned to the device 0, even though the chunks were spread across both workers.

I do have a notebook where I've successfully been able to pin cudfs to specific devices, following one-process-per-GPU, but this required that I assign each GPU a device id and, in all follow-on processing of the underlying cudfs, I used Numba's select_device to always force the device before doing any manipulation on the cudfs.

Since a cuda context is thread-local, it's possible that the selected device can drift when run on a cluster with multiple threads. Using the technique outlined in the previous paragraph, I was able to allow multi-threaded Dask workers and still guarantee OPG. This does not hold for multi-process workers, however.

I have posted this issue in the dask-cudf project because the dask-cudf is responsible for distributing the cudfs onto the workers and it is in this layer that the workers should be forced to use only the device they've been assigned. I understand CUDA_VISIBLE_DEVICES will default to using the first assigned device id when memory is allocated, however, this does not seem to be strong enough of a constraint to enforce that the cudfs on that worker only ever contain memory allocated from that device.

It's also important that we are not limited to only being able to allocate from that (for instance, by setting CUDA_VISIBLE_DEVICES to a single device) because there are times when we will need to pass around IPC handles and collect pointers from different devices onto a single worker.

I can certainly supply the exact notebook I used for testing, if that helps.

[BUG] from_cudf fails due to lack of is_monotonic_increasing attribute of RangeIndex

Description

Calling from_cudf on a GPU dataframe/series fails with an attribute error of RangeIndex' object has no attribute 'is_monotonic_increasing'.

Example

import cudf
import dask_cudf

x = cudf.DataFrame({'a':[1, 2]})  
ddf = dask_cudf.from_cudf(x, npartitions=1)                                                                                              
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-6-c5d005bc77cf> in <module>
----> 1 dask_cudf.from_cudf(x, npartitions=1)

/conda/envs/dask-cudf/lib/python3.7/site-packages/dask/dataframe/io/io.py in from_pandas(data, npartitions, chunksize, sort, name)
    184         return new_dd_object({(name, 0): data}, name, data, [None, None])
    185 
--> 186     if sort and not data.index.is_monotonic_increasing:
    187         data = data.sort_index(ascending=True)
    188     if sort:

AttributeError: 'RangeIndex' object has no attribute 'is_monotonic_increasing'

Environment

# packages in environment at /conda/envs/dask-cudf:
#
# Name                    Version                   Build  Channel
arrow-cpp                 0.12.1           py37h0e61e49_0    conda-forge
backcall                  0.1.0                    py37_0  
blas                      1.0                         mkl  
bokeh                     1.0.4                 py37_1000    conda-forge
boost-cpp                 1.68.0            h11c811c_1000    conda-forge
bzip2                     1.0.6             h14c3975_1002    conda-forge
ca-certificates           2018.11.29           ha4d7672_0    conda-forge
certifi                   2018.11.29            py37_1000    conda-forge
cffi                      1.12.2           py37h9745a5d_0    conda-forge
click                     7.0                      pypi_0    pypi
cloudpickle               0.8.0                      py_0    conda-forge
cudatoolkit               9.1                  h85f986d_0    numba
cudf                      0.5.1                    py37_0    rapidsai
cython                    0.29.5           py37hf484d3e_0    conda-forge
cytoolz                   0.9.0.1         py37h14c3975_1001    conda-forge
dask                      1.1.2                      py_0    conda-forge
dask-core                 1.1.2                      py_0    conda-forge
dask-cudf                 0.0.1+251.gf4cc176          pypi_0    pypi
decorator                 4.3.2                    py37_0  
distributed               1.26.0                   py37_1    conda-forge
freetype                  2.9.1             h94bbf69_1005    conda-forge
heapdict                  1.0.0                 py37_1000    conda-forge
icu                       58.2              hf484d3e_1000    conda-forge
intel-openmp              2019.1                      144  
ipython                   7.3.0            py37h39e3cac_0  
ipython_genutils          0.2.0                    py37_0  
jedi                      0.13.3                   py37_0  
jinja2                    2.10                       py_1    conda-forge
jpeg                      9c                h14c3975_1001    conda-forge
libcudf                   0.5.1                 cuda9.2_0    rapidsai
libcudf_cffi              0.5.1            cuda9.2_py37_0    rapidsai
libffi                    3.2.1             hf484d3e_1005    conda-forge
libgcc-ng                 7.3.0                hdf63c60_0    conda-forge
libgfortran-ng            7.2.0                hdf63c60_3    conda-forge
libpng                    1.6.36            h84994c4_1000    conda-forge
libprotobuf               3.6.1             hdbcaa40_1001    conda-forge
librmm-cffi               0.5.0                    pypi_0    pypi
libstdcxx-ng              7.3.0                hdf63c60_0    conda-forge
libtiff                   4.0.10            h648cc4a_1001    conda-forge
llvmlite                  0.27.0           py37hf484d3e_0    numba
locket                    0.2.0                      py_2    conda-forge
markupsafe                1.1.1            py37h14c3975_0    conda-forge
mkl                       2019.1                      144  
mkl_fft                   1.0.10           py37h14c3975_1    conda-forge
mkl_random                1.0.2            py37h637b7d7_2    conda-forge
msgpack-python            0.6.1            py37h6bb024c_0    conda-forge
ncurses                   6.1               hf484d3e_1002    conda-forge
numba                     0.42.0          np115py37hf484d3e_0    numba
numpy                     1.15.4           py37h7e9f1db_0  
numpy-base                1.15.4           py37hde5b4d6_0  
nvstrings                 0.2.0            cuda9.2_py37_0    nvidia
olefile                   0.46                       py_0    conda-forge
openssl                   1.0.2q               h14c3975_0    conda-forge
packaging                 19.0                       py_0    conda-forge
pandas                    0.24.1           py37hf484d3e_0    conda-forge
parquet-cpp               1.5.1                         4    conda-forge
parso                     0.3.4                    py37_0  
partd                     0.3.9                      py_0    conda-forge
pexpect                   4.6.0                    py37_0  
pickleshare               0.7.5                    py37_0  
pillow                    5.4.1           py37h00a061d_1000    conda-forge
pip                       19.0.3                   py37_0    conda-forge
prompt_toolkit            2.0.9                    py37_0  
psutil                    5.5.1            py37h14c3975_0    conda-forge
ptyprocess                0.6.0                    py37_0  
pyarrow                   0.12.0           py37hbbcf98d_2    conda-forge
pycparser                 2.19                       py_0    conda-forge
pygments                  2.3.1                    py37_0  
pyparsing                 2.3.1                      py_0    conda-forge
python                    3.7.1             hd21baee_1001    conda-forge
python-dateutil           2.8.0                      py_0    conda-forge
pytz                      2018.9                     py_0    conda-forge
pyyaml                    3.13            py37h14c3975_1001    conda-forge
readline                  7.0               hf8c457e_1001    conda-forge
setuptools                40.8.0                   py37_0    conda-forge
six                       1.12.0                py37_1000    conda-forge
sortedcontainers          2.1.0                      py_0    conda-forge
sqlite                    3.26.0            h67949de_1000    conda-forge
tblib                     1.3.2                    pypi_0    pypi
thrift-cpp                0.12.0            h23e226f_1001    conda-forge
tk                        8.6.9             h84994c4_1000    conda-forge
toolz                     0.9.0                    pypi_0    pypi
tornado                   5.1.1           py37h14c3975_1000    conda-forge
traitlets                 4.3.2                    py37_0  
wcwidth                   0.1.7                    py37_0  
wheel                     0.33.1                   py37_0    conda-forge
xz                        5.2.4             h14c3975_1001    conda-forge
yaml                      0.1.7             h14c3975_1001    conda-forge
zict                      0.1.3                    pypi_0    pypi
zlib                      1.2.11            h14c3975_1004    conda-forge

Dask-cuda and Dask-cudf installed from the setup.py files, while in this environment.

dask-dataframe is creating single task and npartition=1 always

I am going through this blog. I have create cluster using dask-cuda with 2 V100 GPUs. And when I create a dask-dataframe, it is always created with single task and npartition=1
but ideally it should create 4 task and npartitions=4.

dask==1.1.3
dask-cuda= build from master
dask-cudf= build from master

image


cudf is coming from 0.5 rapids container
Even with these 1 task dataframes, when I perform join on these, It is giving error: TypeError: merge() got an unexpected keyword argument 'suffixes'
image

Follow cudf with version numbers

It looks like the only tag is 0.0.1. @randerzander recommended that we follow cudf versions.

I'll plan to do this on the next release, though given stability issues I'm not sure when that release might be. I'm inclined not to promise anything until 0.6. @randerzander any concerns with this?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.