Git Product home page Git Product logo

core's People

Contributors

ajschmidt8 avatar albert17 avatar ayodeawe avatar benfred avatar bschifferer avatar edknv avatar jperez999 avatar karlhigley avatar marcromeyn avatar mikemckiernan avatar nv-alaiacano avatar oliverholworthy avatar rjzamora avatar rnyak avatar sararb avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

core's Issues

Use fsspec for reading Schema

We use fsspec for writing out schema's, but we don't for reading it in. This means we can write out to remote storage like S3, but not read it back from S3. We should change to consistently use fsspec for both reading and writing schema files.

`merlin.core` submodule should probably be called `merlin.utils`

Once the worker submodule moves to IO, everything that's left in merlin.core is really utils, so we should call it utils instead. utils isn't great (since every utils package eventually turns into a junk drawer), but it's still clearer than core, which is a great name for the library but a terrible name for a submodule within a library of the same name.

[FEA] / [QST] Dataset to support random splitting & splitting based on value of some column

I would like to raise this for broader consideration -- I am not sure what the answer here is, though I find myself hoping for this functionality more and more as I work with Datasets in NVTabular.

You might want to preprocess your data to a Dataset, maybe apply some NVTabular ops, and only split it down the road. You might want to do this to streamline your pipeline and give yourself the ability to experiment faster. Or you might want to see how the model responds to training on different splits of data.

With certain preprocessing techniques that is not what you might want to do to avoid leakage from train to your validation split. But these scenarios are likely an overwhelmingly small minority.

There are ways to work around this (splitting the data before creating a Dataset or doing to_ddf and creating a new Dataset from the output, after modifying the ddf) but this is quite cumbersome.

The way random splitting could work might be by just passing the portion of data to retain in one of the splits, say 0.7, or 0.2.

And splitting based on value might work by passing in the column to use and the values to retain in one of the splits vs the other.

Something like this
image

(the take here -- all names are very tentative -- might be a nice related functionality that would be nice to have, and could be very useful for experimenting).

[DOC] Docstring coverage in Core

Modules missing docstrings:

  • core
  • dag
  • io
  • schema
Name Total Miss Cover Cover%
core/dispatch.py 44 22 22 50%
core/utils.py 13 9 4 31%
core/worker.py 3 1 2 67%
dag/base_operator.py 9 4 5 56%
dag/graph.py 4 3 1 25%
dag/node.py 12 11 1 8%
dag/selector.py 3 1 2 67%
dag/ops/concat_columns.py 4 3 1 25%
dag/ops/identity.py 2 1 1 50%
dag/ops/selection.py 4 4 0 0%
dag/ops/subset_columns.py 2 1 1 50%
dag/ops/subtraction.py 4 4 0 0%
io/avro.py 6 5 1 17%
io/csv.py 4 3 1 25%
io/dataframe_engine.py 4 3 1 25%
io/dataframe_iter.py 1 1 0 0%
io/dataset.py 14 2 12 86%
io/dataset_engine.py 7 5 2 29%
io/hugectr.py 1 1 0 0%
io/parquet.py 22 16 6 27%
io/shuffle.py 2 1 1 50%
io/writer.py 13 13 0 0%
io/writer_factory.py 1 1 0 0%
schema/schema.py 15 2 13 87%
schema/io/schema_bp.py 51 11 40 78%
TOTAL 286 128 158 55.2%
(5 of 30 files omitted due to complete coverage)

Enable TagSet to be used in Schema selection methods

Enable TagSet to be used in Schema selection methods.

tags = TagSet(["a", "b"])
schema.select_by_tag(tags)

This is motivated by schema selection functionality in the Merlin Models PyTorch API, where it would be convenient to be able to use a dedicated type for multiple tags.

fsspec v22.7.1 breaks Dataset.to_parquet

It looks like the v22.7.1 release of fsspec breaks the Dataset.to_parquet functionality. This writes out a parquet dataset that can't be read back in, and causes errors like

E   pyarrow.lib.ArrowInvalid: Error creating dataset. Could not read schema from '/tmp/pytest-of-jenkins/pytest-9/test_dask_dataset_from_datafra4/part_0.parquet': Could not open Parquet input source '/tmp/pytest-of-jenkins/pytest-9/test_dask_dataset_from_datafra4/part_0.parquet': Parquet file size is 0 bytes. Is this a 'parquet' file?

Note that installing fsspec v22.5.0 works , as does going Dataset.to_ddf().to_parquet(...) (bypassing our custom to_parquet code).

[FEA] Create an easy functionality to generate dict of tensors- a standard way to move array data across frameworks

When we want to trace a PyT model we do this torch.jit.trace(model, train_dict, strict=True). here train_dict is a dictionary of torch tensors. if you look at the Pyt documentation, that corresponds to example_inputs term.

currently we get the dict of tensors as follow, but I think this is not what we want users to practice:

dataset = Dataset(train_paths[0])
trainer.train_dataset_or_path = dataset
loader = trainer.get_train_dataloader()
train_dict = next(iter(loader))

Based on discussions with Karl, looks like this is related to Columns and MerlinArray. We need a standard solution for this.

[BUG] `repartition` on `Dataset` removes tags from schema

image

Reproducer code:

import numpy as np
import cudf
import nvtabular as nvt
from merlin.schema.tags import Tags

purchases = cudf.DataFrame(
    data={'user_id': [0, 1, 2, 2],
          'price': [125.04, 23.07, 101.2, 2.34],
          'color': ['blue', 'blue', 'red', 'yellow'],
          'model': ['deluxe', 'compact', 'regular', 'regular']
})

out = ['price'] >> nvt.ops.AddMetadata(tags=[Tags.TARGET])

out += ['price'] >> nvt.ops.AddTags(tags=[Tags.CONTINUOUS])
out += ['user_id'] >> nvt.ops.TagAsUserID()
out += ['color', 'model'] >> nvt.ops.TagAsItemFeatures()
out += ['color', 'model'] >> nvt.ops.AddTags(tags=[Tags.CATEGORICAL])

ds = nvt.Dataset(purchases)
wf = nvt.Workflow(out)

ds_out = wf.fit_transform(ds)
ds_out.schema

ds_out = ds_out.repartition(5)

ds_out.schema

BUG: unsupported operand type(s) for -: 'TransformWorkflow' and 'list'

Currently this notebook is giving error when we pull the latest branches from the dev branch. Although we dont know the main reason, this might be due to extra columns that are in the NVT workflow output_schema but TF4Rec model does not use them as inputs. One way to remove these extra columns from NVT workflow could be doing it in the way the graph is constructed by using - operator to adjust the selector. However, I am getting an error when I do the following:

torch_op = workflow.input_schema.column_names >> (TransformWorkflow(workflow) - ["session_id", "day_first"])>> PredictPyTorch(
    traced_model, input_schema, output_schema
)

TypeError Traceback (most recent call last)
Cell In[19], line 1
----> 1 torch_op = workflow.input_schema.column_names >> (TransformWorkflow(workflow) - ["session_id"])>> PredictPyTorch(
2 traced_model, input_schema, output_schema
3 )
5 ensemble = Ensemble(torch_op, workflow.input_schema)

TypeError: unsupported operand type(s) for -: 'TransformWorkflow' and 'list'

To repro the issue, you can run these notebooks in order:

torch_op = workflow.input_schema.column_names >> (TransformWorkflow(workflow) - ["session_id", "day_first"])>> PredictPyTorch(
    traced_model, input_schema, output_schema
)
ensemble = Ensemble(torch_op, workflow.input_schema)

[Bug] Installing latest branch of Merlin core raises error when importing cuDF

When I pull the latest branch of Merlin core and install it, I cannot import cuDF anymore.

Steps to reproduce:

  • docker pull merlin-training-tensorflow:22.04
  • cd /core/ && git pull origin main && pip install .
  • python
  • import cuDF

Error:

File cudf/_lib/gpuarrow.pyx:1, in init cudf._lib.gpuarrow()

ModuleNotFoundError: No module named 'pyarrow._cuda'

[FEA] Provide a way to exclude target columns from selections by tag

Is your feature request related to a problem? Please describe.

I am using NVTabular to preprocess data of the recsys challenge: the label is the id of the purchased item (a multi-classification problem).
One of the pre-processing steps is to encode the purchased_item column using the Categorify operator. The operator automatically generates the encoded feature tagged with Tags.CATEGORICAL. However, in our use case, we need to remove this tag to not use the label as an input to the model.
We can also think of other use cases where we generate new variables from parent features but we don't want to keep all the parent tags.

Describe the solution you'd like
Similar to the AddMetaData operator, it would be great to have a RemoveMetaData op to remove unwanted tags/properties.

Describe alternatives you've considered
I manually edited the schema file and removed the categorical tag from the purchased_item feature.

dask read_parquet changes disable parquet file statistics reading in default case

Changes introduced in this dask PR dask/dask#8992, especially here and here break/change the default case logic in nvtabular for reading parquet file statistics (to effectively disable it).

This introduced a significant slow down (10-15 min) at start up (and possibly other problems) in our use case with KerasSequenceLoader and the large criteo dataset, as the entire dataset was being read in to determine the number of rows.

Work around is to pin dask version or explicitly set gather_statistics=True (or calculate_divisions=True to avoid dask deprecation warning) as argument to Dataset constructor, but would be good to have default behavior remain as before.

[BUG] runtime error: "ValueError: high is out of bounds for int32"

Bug description

When training the all recommender systems models with NVIDIA Merlin at:
...
history = model.fit( train_data, epochs=3, batch_size=512, pre=predict_last )
...
runtime error: "ValueError: high is out of bounds for int32"

Steps/Code to reproduce bug

  1. I solved it like this:
    at file C:\Users\Xavier\AppData\Roaming\Python\Python310\site-packages\merlin\core\dispatch.py line 778
    from:
    seeds = random_state.randint(0, 2 ** 32, size=global_size)
    to:
    seeds = random_state.randint(0, 2 ** 32, size=global_size, dtype='int64') # include ", dtype='int64'"

Expected behavior

After the change in line 778 setting the type, all models trained normally.

Environment details

  • Merlin version: (23.5.0)
  • Platform: Windows, Lenovo GPU NVIDIA GeForce GTX 1650
  • Python version: 3.10.0
  • PyTorch version (GPU?):
  • Tensorflow version (GPU?): 2.10.0

Additional context

with pip install protobuf==3.20.0

Error of running Merlin example on Google cloud platform

Hi, I am trying to run the example notebook 03-model-inference-hugectr.ipynb of Merlin on vertex AI.

My env is :

google cloud platform, Vertex AI, cloud shell

I am running the notebook from cloud shell. But I got an error when I run

export.export_ensemble(
 model_name=MODEL_NAME,
 workflow_path=local_workflow_path,
 saved_model_path=local_saved_model_path,
 output_path=local_ensemble_path,
 categorical_columns=categorical_columns,
 continuous_columns=continuous_columns,
 label_columns=label_columns,
 num_slots=NUM_SLOTS,
 max_nnz=MAX_NNZ,
 num_outputs=NUM_OUTPUTS,
 embedding_vector_size=EMBEDDING_VECTOR_SIZE,
 max_batch_size=MAX_BATCH_SIZE,
 model_repository_path=model_repository_path
)

The error:

   File '/home/.local/lib/python3.9/site-packages/merlin/dag/selector.py' line 151, in __bool__
   Attribute Error: 'ColumnSelector' object has no attribute 'all'

It is the file https://github.com/NVIDIA-Merlin/core/blob/main/merlin/dag/selector.py

Could anyone let me know how to work around it ?

thanks

HAS_GPU does not reflect CUDA VISIBLE DEVICE Settings

We check, if GPUs are available in following line:

HAS_GPU = nvml.device_get_count() > 0

If my machine has GPUs, but I want to run in CPU-only mode by setting CUDA_VISIBLE_DEVICE, the Merlin Library still has HAS_GPU=True

Example:
export CUDA_VISIBLE_DEVICES='';python -c 'from dask.distributed.diagnostics import nvml; print(nvml.device_get_count())'

Returns 2 for 2x GPUs. But I set CUDA_VISIBLE_DEVICES to empty

[BUG] Dask-CUDA does not work with Merlin/NVTabular

As pointed out by @oliverholworthy in #274 (comment), cuda_isavailable() is used in merlin.core.compat to check for cuda support. Unfortunately, this is a known problem for dask-cuda.

This most likely means that Merlin/NVTabular has not worked properly with Dask-CUDA for more than six months now. For example, the following code will produce an OOM error for 32GB V100s:

import time
from merlin.core.utils import Distributed
if __name__ == "__main__":
    with Distributed(rmm_pool_size="24GiB"):
        time.sleep(30)

You will also see an error if you don't import any merlin/nvt code, but use the offending cuda.is_available() command:

import time
from numba import cuda # This is fine
cuda.is_available() # This is NOT
from dask_cuda import LocalCUDACluster
if __name__ == "__main__":
    with LocalCUDACluster(rmm_pool_size="24GiB") as cluster:
        time.sleep(30)

Meanwhile, the code works fine if you don't sue the offending command or import code that also imports merlin.core.compat:

import time
from dask_cuda import LocalCUDACluster
if __name__ == "__main__":
    with LocalCUDACluster(rmm_pool_size="24GiB") as cluster:
        time.sleep(30)

[FEA] Add a feature to remove Tags from a column in schema

When we perform Categorify on a Target column, Tags.CATEGORICAL is added to the schem. For an internal project we need to remove this tag and we currently don't have a functionality for this.

We can create the below functionality which updates the schema:

schema[column_name].remove_tag(tag=[Tags.TAG_NAME])

[FEA] Add more meaningful error message when we the dataset folder is wrong

Is your feature request related to a problem? Please describe.

When we set the DATA_FOLDER wrong in our NVT workflow, we get the following error as an Index error, but we should be getting more meaningful error like: the dataset cannot be found in ... folder.

---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
File <timed exec>:15, in <module>

File /models/merlin/models/utils/example_utils.py:10, in workflow_fit_transform(outputs, train_path, valid_path, output_path, workflow_name)
      7 """fits and transforms datasets applying NVT workflow"""
      8 workflow = nvt.Workflow(outputs)
---> 10 train_dataset = nvt.Dataset(train_path)
     11 valid_dataset = nvt.Dataset(valid_path)
     13 workflow.fit(train_dataset)

File /core/merlin/io/dataset.py:304, in Dataset.__init__(self, path_or_source, engine, npartitions, part_size, part_mem_fraction, storage_options, dtypes, client, cpu, base_dataset, schema, **kwargs)
    302 if isinstance(engine, str):
    303     if engine == "parquet":
--> 304         self.engine = ParquetDatasetEngine(
    305             paths, part_size, storage_options=storage_options, cpu=self.cpu, **kwargs
    306         )
    307     elif engine == "csv":
    308         self.engine = CSVDatasetEngine(
    309             paths, part_size, storage_options=storage_options, cpu=self.cpu, **kwargs
    310         )

File /core/merlin/io/parquet.py:297, in ParquetDatasetEngine.__init__(self, paths, part_size, storage_options, row_groups_per_part, legacy, batch_size, cpu, **kwargs)
    292 self.dataset_kwargs = self.read_parquet_kwargs.pop("dataset", {})
    294 if row_groups_per_part is None:
    295     self._real_meta, rg_byte_size_0 = run_on_worker(
    296         _sample_row_group,
--> 297         self._path0,
    298         self.fs,
    299         cpu=self.cpu,
    300         memory_usage=True,
    301         **self.read_parquet_kwargs,
    302     )
    303     row_groups_per_part = self.part_size / rg_byte_size_0
    304     if row_groups_per_part < 1.0:

File /core/merlin/io/parquet.py:322, in ParquetDatasetEngine._path0(self)
    319 @property
    320 @functools.lru_cache(1)
    321 def _path0(self):
--> 322     return next(self._dataset.get_fragments()).path

File /core/merlin/io/parquet.py:352, in ParquetDatasetEngine._dataset(self)
    349     dataset = pa_ds.dataset(paths, filesystem=fs)
    350 else:
    351     # This is a directory or a single file
--> 352     dataset = pa_ds.dataset(paths[0], filesystem=fs)
    353 return dataset

IndexError: list index out of range

[FEA] Add compute_with_dask utility for down-stream Merlin libriaries

Proposal

I propose that we add a single compute_dask_object (or compute_with_dask) utility to merlin.core.utils, and use that utility for all Dask computation within Merlin. The purpose of this utility would be to check the global_dask_client() utility, and utilize the appropriate default client/scheduler for Merlin (and not the default for dask/distributed).

More Background

While starting to look into merlin-models#339, I noticed that there is at least one place where Merlin-models uses a bare compute() statement to compute a Dask collection. I suspect that this is also done in several other places across the Merlin ecosystem.

When there is no global Dask client in the current python context, using compute() will typically result in execution with Dask's "multi-threaded" scheduler. This may be fine for CPU-backed data, but will result in many python threads thrashing the same GPU (device 0) when the data is GPU backed.

For compute operations in NVTabular (which only operate on Delayed Dask objects), the merlin.core.utils.global_dask_client utility is used to query to current Dask client. If this function returns None, the convention is to use the "sychronous" scheduler (compute(scheduler="sychronous")), otherwise the distributed client is used. I propose that this same convention be used everywhere in Merlin (besides special cases where scheduler="synchronous" can be hard coded.

Note that these changes are also required for Merlin's Serial and Distributed context managers to work correctly.

Proposed Implementation

from merlin.core.utils import global_dask_client
from dask.base import is_dask_collection
from dask.delayed import Delayed

def  compute_dask_object(dask_obj):
    """Compute a Dask collection using Merlin's dask-client settings"""

    # Check global client
    dask_client = global_dask_client()

    if is_dask_collection(dask_obj) or isinstance(dask_obj, Delayed):
        # Compute simple Dask collections
        # (Use distributed client, or fall back to "synchronous" scheduler)
        scheduler = dask_client.get if dask_client else  "synchronous"
        return dask_obj.compute(scheduler=scheduler)
    elif isinstance(dask_obj, list):
        # Maybe check that all elements of list are collections or Delayed?
        # Compute entire list at once:
        if dask_client:
            return [r.result() for r in dask_client.compute(dask_obj)]
        else:
            return dask.compute(dask_obj, scheduler="synchronous")[0]
    else:
        raise ValueError

[BUG] Merlin has AttributeError: 'ColumnSelector' object has no attribute 'all' for an example notebook

Bug description

/usr/local/lib/python3.8/dist-packages/nvtabular/workflow/workflow.py:427: UserWarning: Loading workflow generated with nvtabular version 0+unknown - but we are running nvtabular 23.02.00. This might cause issues
  warnings.warn(
/usr/local/lib/python3.8/dist-packages/nvtabular/workflow/workflow.py:427: UserWarning: Loading workflow generated with cudf version 22.02.00a+309.gdad51a548e - but we are running cudf 22.08.00a+304.g6ca81bbc78.dirty. This might cause issues
  warnings.warn(
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[17], line 1
----> 1 export.export_ensemble(
      2     model_name=MODEL_NAME,
      3     workflow_path=local_workflow_path,
      4     saved_model_path=local_saved_model_path,
      5     output_path=local_ensemble_path,
      6     categorical_columns=categorical_columns,
      7     continuous_columns=continuous_columns,
      8     label_columns=label_columns,
      9     num_slots=NUM_SLOTS,
     10     max_nnz=MAX_NNZ,
     11     num_outputs=NUM_OUTPUTS,
     12     embedding_vector_size=EMBEDDING_VECTOR_SIZE,
     13     max_batch_size=MAX_BATCH_SIZE,
     14     model_repository_path=model_repository_path
     15 )

File /home/jupyter/mluser/git/aml-merlin-on-vertex-ai/example/../src/serving/export.py:107, in export_ensemble(model_name, workflow_path, saved_model_path, output_path, categorical_columns, continuous_columns, label_columns, num_slots, max_nnz, num_outputs, embedding_vector_size, max_batch_size, model_repository_path)
    104 hugectr_params['embedding_vector_size'] = embedding_vector_size
    105 hugectr_params['n_outputs'] = num_outputs
--> 107 export_hugectr_ensemble(
    108     workflow=workflow,
    109     hugectr_model_path=saved_model_path,
    110     hugectr_params=hugectr_params,
    111     name=model_name,
    112     output_path=output_path,
    113     label_columns=label_columns,
    114     cats=categorical_columns,
    115     conts=continuous_columns,
    116     max_batch_size=max_batch_size,
    117 )
    119 hugectr_backend_config = create_hugectr_backend_config(
    120     model_path=os.path.join(output_path, model_name, '1'),
    121     max_batch_size=max_batch_size,
    122     deployed_device_list=[0],
    123     model_repository_path=model_repository_path)
    125 with open(os.path.join(output_path, HUGECTR_CONFIG_FILENAME), 'w') as f:

File /usr/local/lib/python3.8/dist-packages/nvtabular/inference/triton/ensemble.py:245, in export_hugectr_ensemble(workflow, hugectr_model_path, hugectr_params, name, output_path, version, max_batch_size, nvtabular_backend, cats, conts, label_columns)
    242 if not cats and not conts:
    243     raise ValueError("Either cats or conts has to have a value.")
--> 245 workflow = workflow.remove_inputs(labels)
    247 # generate the nvtabular triton model
    248 preprocessing_path = os.path.join(output_path, name + "_nvt")

File /usr/local/lib/python3.8/dist-packages/nvtabular/workflow/workflow.py:160, in Workflow.remove_inputs(self, input_cols)
    140 def remove_inputs(self, input_cols) -> "Workflow":
    141     """Removes input columns from the workflow.
    142 
    143     This is useful for the case of inference where you might need to remove label columns
   (...)
    158     merlin.dag.Graph.remove_inputs
    159     """
--> 160     self.graph.remove_inputs(input_cols)
    161     return self

File /usr/local/lib/python3.8/dist-packages/merlin/dag/graph.py:173, in Graph.remove_inputs(self, to_remove)
    171 node, columns_to_remove = nodes_to_process.popleft()
    172 if node.input_schema and len(node.input_schema):
--> 173     output_columns_to_remove = node.remove_inputs(columns_to_remove)
    175     for child in node.children:
    176         nodes_to_process.append(
    177             (child, list(set(to_remove + output_columns_to_remove)))
    178         )

File /usr/local/lib/python3.8/dist-packages/merlin/dag/node.py:425, in Node.remove_inputs(self, input_cols)
    411 def remove_inputs(self, input_cols: List[str]) -> List[str]:
    412     """
    413     Remove input columns and all output columns that depend on them.
    414 
   (...)
    423         The output columns that were removed
    424     """
--> 425     removed_outputs = _derived_output_cols(input_cols, self.column_mapping)
    427     self.input_schema = self.input_schema.without(input_cols)
    428     self.output_schema = self.output_schema.without(removed_outputs)

File /usr/local/lib/python3.8/dist-packages/merlin/dag/node.py:484, in Node.column_mapping(self)
    482 @property
    483 def column_mapping(self):
--> 484     selector = self.selector or ColumnSelector(self.input_schema.column_names)
    485     return self.op.column_mapping(selector)

File /usr/local/lib/python3.8/dist-packages/merlin/dag/selector.py:151, in ColumnSelector.__bool__(self)
    150 def __bool__(self):
--> 151     return bool(self.all or self._names or self.subgroups or self.tags)

AttributeError: 'ColumnSelector' object has no attribute 'all'

Steps/Code to reproduce bug

  1. Run this https://github.com/GoogleCloudPlatform/nvidia-merlin-on-vertex-ai/blob/main/03-model-inference-hugectr.ipynb

  2. At the cell with

    export.export_ensemble(
    model_name=MODEL_NAME,
    workflow_path=local_workflow_path,
    saved_model_path=local_saved_model_path,
    output_path=local_ensemble_path,
    categorical_columns=categorical_columns,
    continuous_columns=continuous_columns,
    label_columns=label_columns,
    num_slots=NUM_SLOTS,
    max_nnz=MAX_NNZ,
    num_outputs=NUM_OUTPUTS,
    embedding_vector_size=EMBEDDING_VECTOR_SIZE,
    max_batch_size=MAX_BATCH_SIZE,
    model_repository_path=model_repository_path
    )

  3. Got the above error

Expected behavior

Environment details

  • Merlin version: 1.9.1
  • Platform: Ubuntu 20.04.5 LTS
  • Python version: Python 3.8.10 (default, Nov 14 2022, 12:59:47) [GCC 9.4.0] on linux
  • PyTorch version (GPU?): 2.0.0 (support GPU)

Additional context

Merlin image: nvcr.io/nvidia/merlin/merlin-pytorch:23.02

Distributor ID: Ubuntu
Description:    Ubuntu 20.04.5 LTS
Release:        20.04
Codename:       focal

merlin                                1.9.1
merlin-core                           23.2.1
merlin-dataloader                     0.0.3
merlin-models                         23.2.0
merlin-systems                        23.2.0

nvidia-cublas-cu11                    11.10.3.66
nvidia-cuda-cupti-cu11                11.7.101
nvidia-cuda-nvrtc-cu11                11.7.99
nvidia-cuda-runtime-cu11              11.7.99
nvidia-cudnn-cu11                     8.5.0.96
nvidia-cufft-cu11                     10.9.0.58
nvidia-curand-cu11                    10.2.10.91
nvidia-cusolver-cu11                  11.4.0.1
nvidia-cusparse-cu11                  11.7.4.91
nvidia-nccl-cu11                      2.14.3
nvidia-nvtx-cu11                      11.7.91
nvidia-pyindex                        1.0.9
nvtabular                             23.2.0
torch                                 2.0.0

[Bug] Using numba to detect gpu availability breaks Dask-CUDA worker pinning

While attempting to benchmark NVIDIA-Merlin/NVTabular#1687, I discovered that the dask-criteo benchmark does not work with the latest version of NVTabular/Merlin-core.

As far as I can tell, the problem is that #98 added the following logic to detect GPU availability: HAS_GPU = len(cuda.gpus.lst) > 0. This logic works just fine within a local process, but breaks Dask-CUDA device pinning when it is included in a top-level import (or is performed in the global context of the program). In other words, code like this shouldn't be executed by an import statement, like from merlin.core.compat import HAS_GPU.

The problem becomes apparent in a simple (Merlin-free) reproducer:

# reproducer.py
from dask_cuda import LocalCUDACluster
from numba import cuda # This is fine

HAS_GPU = len(cuda.gpus.lst) > 0  # This is not fine

if __name__ == "__main__":
    cluster = LocalCUDACluster()

If you execute python ./reproducer.py, you sill see warnings like:

/.../distributed/distributed/comm/ucx.py:67: UserWarning: Worker with process ID 49507 should have a CUDA context assigned to device 1, but instead the CUDA context is on device 0. This is often the result of a CUDA-enabled library calling a CUDA runtime function before Dask-CUDA can spawn worker processes. Please make sure any such function calls don't happen at import time or in the global scope of a program.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.