As a user, I want to compute hierarchical aggregations by grouping on multiple columns. Passing a list of column names to the groupby
API causes a column not found error, while grouping by each column individually succeeds. The second column in the list is the one that isn't found.
import cudf
import numpy as np
import pandas as pd
import dask_cudf
import dask.dataframe as dd
df = cudf.DataFrame([('a', list(range(20))),
('b', list(reversed(range(20)))),
('c', list(range(20)))])
df['agg_col1'] = [1 if x % 2 == 0 else 0 for x in range(len(df))]
df['agg_col2'] = [1 if x % 3 == 0 else 0 for x in range(len(df))]
pdf = df.to_pandas()
pddf = dd.from_pandas(pdf, npartitions=2)
pddf.groupby(['agg_col1', 'agg_col2']).sum().compute()
a b c
agg_col1 agg_col2
0 0 73 60 73
1 27 30 27
1 0 54 60 54
1 36 40 36
import cudf
import numpy as np
import pandas as pd
import dask_cudf
import dask.dataframe as dd
df = cudf.DataFrame([('a', list(range(20))),
('b', list(reversed(range(20)))),
('c', list(range(20)))])
df['agg_col1'] = [1 if x % 2 == 0 else 0 for x in range(len(df))]
df['agg_col2'] = [1 if x % 3 == 0 else 0 for x in range(len(df))]
ddf = dask_cudf.from_cudf(df, npartitions=2)
print(ddf.groupby('agg_col1').sum().compute())
a b c agg_col2
0 100 90 100 3
1 90 100 90 4
print(ddf.groupby('agg_col2').sum().compute())
a b c agg_col1
0 127 120 127 6
1 63 70 63 4
print(ddf.groupby(['agg_col1', 'agg_col2']).sum().compute())
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
<ipython-input-61-8baea99ae3bc> in <module>
14 ddf = dask_cudf.from_cudf(df, npartitions=2)
15
---> 16 print(ddf.groupby(['agg_col1', 'agg_col2']).sum().compute())
/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
499 """
500 kwargs.pop('num_workers', None) # if num_workers present, remove it
--> 501 return get_async(apply_sync, 1, dsk, keys, **kwargs)
502
503
/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
445 # Seed initial tasks into the thread pool
446 while state['ready'] and len(state['running']) < num_workers:
--> 447 fire_task()
448
449 # Main loop, wait on tasks to finish, insert new ones
/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in fire_task()
441 args=(key, dumps((dsk[key], data)),
442 dumps, loads, get_id, pack_exception),
--> 443 callback=queue.put)
444
445 # Seed initial tasks into the thread pool
/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in apply_sync(func, args, kwds, callback)
488 def apply_sync(func, args=(), kwds={}, callback=None):
489 """ A naive synchronous version of apply_async """
--> 490 res = func(*args, **kwds)
491 if callback is not None:
492 callback(res)
/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
233 failed = False
234 except BaseException as e:
--> 235 result = pack_exception(e, dumps)
236 failed = True
237 return key, result, failed
/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
228 try:
229 task, data = loads(task_info)
--> 230 result = _execute_task(task, data)
231 id = get_id()
232 result = dumps((result, id))
/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/compatibility.py in apply(func, args, kwargs)
91 def apply(func, args, kwargs=None):
92 if kwargs:
---> 93 return func(*args, **kwargs)
94 else:
95 return func(*args)
/conda/envs/cudf_dev/lib/python3.7/site-packages/dask/dataframe/groupby.py in _apply_chunk(df, *index, **kwargs)
235 if isinstance(columns, (tuple, list, set, pd.Index)):
236 columns = list(columns)
--> 237 return func(g[columns])
238
239
/conda/envs/cudf_dev/lib/python3.7/site-packages/cudf-0.6.0.dev0+803.gde773d4-py3.7-linux-x86_64.egg/cudf/groupby/groupby.py in __getitem__(self, arg)
286 for val in arg:
287 if val not in self._val_columns:
--> 288 raise KeyError("Column not found: " + str(val))
289 result = self.copy()
290 result._val_columns = arg
KeyError: 'Column not found: agg_col2'