Git Product home page Git Product logo

torchrec's Introduction

TorchRec (Beta Release)

Docs

TorchRec is a PyTorch domain library built to provide common sparsity & parallelism primitives needed for large-scale recommender systems (RecSys). It allows authors to train models with large embedding tables sharded across many GPUs.

TorchRec contains:

  • Parallelism primitives that enable easy authoring of large, performant multi-device/multi-node models using hybrid data-parallelism/model-parallelism.
  • The TorchRec sharder can shard embedding tables with different sharding strategies including data-parallel, table-wise, row-wise, table-wise-row-wise, column-wise, table-wise-column-wise sharding.
  • The TorchRec planner can automatically generate optimized sharding plans for models.
  • Pipelined training overlaps dataloading device transfer (copy to GPU), inter-device communications (input_dist), and computation (forward, backward) for increased performance.
  • Optimized kernels for RecSys powered by FBGEMM.
  • Quantization support for reduced precision training and inference.
  • Common modules for RecSys.
  • Production-proven model architectures for RecSys.
  • RecSys datasets (criteo click logs and movielens)
  • Examples of end-to-end training such the dlrm event prediction model trained on criteo click logs dataset.

Installation

Torchrec requires Python >= 3.8 and CUDA >= 11.8 (CUDA is highly recommended for performance but not required). The example below shows how to install with Python 3.8 and CUDA 12.1. This setup assumes you have conda installed.

Binaries

Experimental binary on Linux for Python 3.8, 3.9, 3.10, 3.11 and 3.12 (experimental), and CPU, CUDA 11.8 and CUDA 12.1 can be installed via pip wheels from download.pytorch.org and PyPI (only for CUDA 12.1).

Below we show installations for CUDA 12.1 as an example. For CPU or CUDA 11.8, swap "cu121" for "cpu" or "cu118".

Installations

Nightly

pip install torch --index-url https://download.pytorch.org/whl/nightly/cu121
pip install fbgemm-gpu --index-url https://download.pytorch.org/whl/nightly/cu121
pip install torchmetrics==1.0.3
pip install torchrec --index-url https://download.pytorch.org/whl/nightly/cu121

Stable via pytorch.org

pip install torch --index-url https://download.pytorch.org/whl/cu121
pip install fbgemm-gpu --index-url https://download.pytorch.org/whl/cu121
pip install torchmetrics==1.0.3
pip install torchrec --index-url https://download.pytorch.org/whl/cu121

Stable via PyPI (only for CUDA 12.1)

pip install torch
pip install fbgemm-gpu
pip install torchrec

Colab example: introduction + install

See our colab notebook for an introduction to torchrec which includes runnable installation. - Tutorial Source - Open in Google Colab

From Source

We are currently iterating on the setup experience. For now, we provide manual instructions on how to build from source. The example below shows how to install with CUDA 12.1. This setup assumes you have conda installed.

  1. Install pytorch. See pytorch documentation.

    CUDA 12.1
    
    pip install torch --index-url https://download.pytorch.org/whl/nightly/cu121
    
    CUDA 11.8
    
    pip install torch --index-url https://download.pytorch.org/whl/nightly/cu118
    
    CPU
    
    pip install torch --index-url https://download.pytorch.org/whl/nightly/cpu
    
  2. Clone TorchRec.

    git clone --recursive https://github.com/pytorch/torchrec
    cd torchrec
    
  3. Install FBGEMM.

    CUDA 12.1
    
    pip install fbgemm-gpu --index-url https://download.pytorch.org/whl/nightly/cu121
    
    CUDA 11.8
    
    pip install fbgemm-gpu --index-url https://download.pytorch.org/whl/nightly/cu118
    
    CPU
    
    pip install fbgemm-gpu --index-url https://download.pytorch.org/whl/nightly/cpu
    
  4. Install other requirements.

    pip install -r requirements.txt
    
  5. Install TorchRec.

    python setup.py install develop
    
  6. Test the installation (use torchx-nightly for 3.11; for 3.12, torchx currently doesn't work).

    GPU mode
    
    torchx run -s local_cwd dist.ddp -j 1x2 --gpu 2 --script test_installation.py
    
    CPU Mode
    
    torchx run -s local_cwd dist.ddp -j 1x2 --script test_installation.py -- --cpu_only
    

    See TorchX for more information on launching distributed and remote jobs.

  7. If you want to run a more complex example, please take a look at the torchrec DLRM example.

Contributing

Pyre and linting

Before landing, please make sure that pyre and linting look okay. To run our linters, you will need to

pip install pre-commit

, and run it.

For Pyre, you will need to

cat .pyre_configuration
pip install pyre-check-nightly==<VERSION FROM CONFIG>
pyre check

We will also check for these issues in our GitHub actions.

License

TorchRec is BSD licensed, as found in the LICENSE file.

torchrec's People

Contributors

842974287 avatar colin2328 avatar dstaay-fb avatar ezyang avatar facebook-github-bot avatar fegin avatar ge0405 avatar gnahzg avatar henrylhtsang avatar iamzainhuda avatar jianyuh avatar jiayisuse avatar joshuadeng avatar lequytra avatar levythu avatar osalpekar avatar paulzhang12 avatar renfeichen-fb avatar rkindi avatar s4ayub avatar samiwilf avatar sarckk avatar troygarden avatar wanchaol avatar xing-liu avatar xw285cornell avatar yazhigao avatar ylgh avatar zhuzilin avatar zyan0 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

torchrec's Issues

Question about EmbeddingBoundsCheck

Hi! I encountered logs like

EmbeddingBoundsCheck: (at least one) Out of bounds access for batch: 26, table: 6, bag element: 0, idx: 10, num_rows: 10, indices_start: 1466, indices_end: 1467, T: 12, B: 64, b_t: 410. Setting idx to zero. when modifying the two tower retrieval example to fit our use case. This log appears in running TrainPipelineSparseDist.progess() but doesn't show up for all iterations. It's also always about table 6. Currently I'm only running on a single GPU single host environment.

Some other sample logs:

EmbeddingBoundsCheck: (at least one) Out of bounds access for batch: 54, table: 6, bag element: 0, idx: 10, num_rows: 10, indices_start: 1413, indices_end: 1414, T: 12, B: 64, b_t: 438. Setting idx to zero.

EmbeddingBoundsCheck: (at least one) Out of bounds access for batch: 44, table: 6, bag element: 0, idx: 10, num_rows: 10, indices_start: 1364, indices_end: 1365, T: 12, B: 64, b_t: 428. Setting idx to zero.

Could the team help shed some light on what this log means? I tried modifying an EmbeddingBagConfig's num_embeddings to be significantly smaller than the max possible index in KeyedJaggedTensor, and it will throw error right away. I also couldn't find where this log line is from in torchrec source code.

Many thanks in advance! Great job on writing up those examples by the way :)

Torchscript tracing / scripting best practices

Hi there! I'm trying to deploy a torchrec model with torchscript and I wonder what's the best practice for it.

From my experience, when tracing the model, we cannot get a script model that supports dynamic batch size. The reason for this is that the lengths_per_key is of type List[int], which will be regard as constant during tracing. Therefore a workaround I made is passing a torch.Tensor as length_per_key and change here:

values_list = torch.split(values, length_per_key)

into:

        # Note here `length_per_key` is a Tensor
        offset_per_key = torch.cumsum(length_per_key, dim=0)
        values_list = torch.tensor_split(values, offset_per_key)

I wonder whether it's a good idea to support Tensor type length_per_key and if there is a better way to support dynamic batch size.

Thank you for your time on this issue :)

Quantized Communications

We want quantized comms to be a first class feature in TorchRec. Additionally, the current (internal) strategy is incompatible with an internal library feature that we need (lazily creating output_dist).

This new API design will also let us open source this functionality.

The new design will directly give sharder the qcomm_config, which will be plumbed down to the comm module level (PooledAllToAll, etc).

sharders = [
  EmbeddingBagCollectionSharder(
  qcomms_config=QCommsConfig(
  forward_precision=CommType.FP16,
  backward_precision=CommType.BF16),
),
]

sharded_model = DistributedModelParallel(model, sharders)

This feature will be strictly opt-in for now, and is fully backward compatible with the auto_quantize wrapper.

Weโ€™ve seen initial success from this strategy, and have a working prototype. Traces show the correct encoder/decoder steps and a decreased comms time. Simultaneously, weโ€™re also running production models using this new API to ensure no regressions.

Release plans
We plan on only supporting FP16/BF16 as a quantization scheme for the first OSS release, and work on expanding the settings.
Initially we will mark this as a prototype feature, with the caveat that quantized comms is very model dependent, and OSS users may not see throughput gains.

TorchRec Sharding Composability

Desired behavior

We want to make TorchRec sharding composable w/ other sharding/parallelism techniques. This practically means that after applying TorchRec sharding model characteristics remain the same (e.g. state_dict() doesnโ€™t change) and we donโ€™t effect non-sharded parts of a model, e.g.

m = Model(device="meta")
orig_keys = list(m.state_dict().keys())
m.ebc = torchrec.shard_embedding_bag_collection(m.ebc, ....)
sharded_keys = list(m.ebc.state_dict().keys())

assert orig_keys == sharded(keys) # at all ranks, even for table-wise sharded

Observation for nn.Module

  • state_dict() - recursive call via state_dict()
  • load_state_dict() : recursive call via local load() call, which calls _load_from_state_dict
  • named_parameters()/named_buffers() : calls named_modules
  • named_modules() : recursive call

As we donโ€™t want to modify state_dict/named_parameters etc impls of non-sharded modules (including top-level module) we have to ensure that _modules is consistent

Motivations

  • Current implementation (of named_parameters()/named_buffers()/named_modules()) is consistent with nn.module defaults currently
    • This logic is tricky to get right
  • Removes need for custom code (e.g tricks in DMP and ShardedModules to keep module FQN consistent)
  • Currently state_dict does not contain state of all tensors
    • if a table is sharded table_wise, then only that rankโ€™s state_dict sees this
  • easier integration with other distributed solutions (e.g FSDP for DHEN).
    • We need to share a similar semantics of what is returned by nn.Module APIs, and this will make things consistent.
      • We can also do this within DMP nn.Module overrides, but that route is much heavier and bug-prone

Proposed APIs

Use the trick employed by torch.fx.GraphModule of registering empty nn.Modules for handling state and have implementation of forward() call based on a different module hierarchy.

new APIs

def shard_embedding_modules(model, sharders, plan: EmbeddingShardingPlan):
   # This module will replace all child ebcs/fused_ebcs modules with sharded
   # variants based on the plans and init meta tensors.
   
   # it returns the unsharded modules and their replacements
   # this useful for identifying which modules are already sharded
   return sharded_parameter_names

def dlrm_parallelize(model, 
   sharders=[EmbeddingBagCollectionSharder, FusedEmbeddingBagCollectionSharder, ...], 
   embedding_plan: Optional[EmbeddingShardingPlan]):
  
   # This style of sharding will shard embedding modules with torchrec sharder
   # and everything else in a distributed data parallel fashion.
   
   if embedding_plan is None
      embedding_plan = EmbeddingPlanner(sharders, topology, ...)
      
   sharded_embedding_modules = shard_embedding_modules(model, plan)
   
   # DDP can be done in two ways
   # with DDP wrapper:
   # This will rely on DDP being composable
   
   model = DistributedDataParallel(model, 
               params_and_buffers_to_ignore=[get_params_and_buffers(sharded_embedding_modules)]
           )      
   modules = model.modules()
   
   # or replace underlying module tensors with DataParallelTensor
   for name, param in model.named_parameters():
       if param not in get_params_and_buffers(sharded_embedding_modules):
          model.register_parameter(name, DistributedParallelTensor(param))

   
# similar wrapper can be done with FSDP, or just do it in model definition

class ShardedEmbeddingBagCollection()
    def __init__(self):
        # modules to handle state
        self.embedding_bags: nn.ModuleDict = nn.ModuleDict() 
        # these are sharded modules, Dict is not registered as sub-moduels
        self._lookups: Dict[str, nn.Module] ={} 
        # Add parameters/buffers from self._lookups to self.embedding_bags 
        
    def forward(self, KJT):
        # unchanged impl      

Model Authoring

embedding_bag_configs = [
   EmbeddingBagConfig(
       name="t1", embedding_dim=4, num_embeddings=10, feature_names=["f1"]
   ),
   EmbeddingBagConfig(
       name="t2", embedding_dim=4, num_embeddings=10, feature_names=["f2"]
   ),
   EmbeddingBagConfig(
       name="t3", embedding_dim=4, num_embeddings=10, feature_names=["f3"]
   )
]
model = DLRMModel(ebc=EmbeddingBagCollection(embedding_bag_configs))
model = fuse_embedding_optimizer(model, ...) # recursively replaces model.ebc with FusedEBC

# using wrappers to parallelize
dlrm_parallelize(model)

# or explicitly do it
shard_embedding_modules(model, 
   plan={"embedding_bag_configs": 
    {"t1": ParameterSharding(sharding_type=ROW_WISE, placement=DEVICE, ranks=...)},
    {"t2": ParameterSharding(sharding_type=ROW_WISE, placement=UVM_CACHING, ranks=...)},
   })
   

print(model.named_parameters())
>>> [overarch..., linear,..., embedding_bag_configs.t1.weights: ShardedTensor]

opt = torch.optim.SGD(model.parameters(), lr=.02)

# if we do not use FusedEBC, then we expect that these ShardedTensors have 
# a grad field (or maybe these are ShardedParameters), and optimizers will
# naturally work on top of them
# If we have FusedEBC, their grads will be None, and opt.step will be a no-op

print(model.state_dict()["embedding_bag_configs.t1.weight"])
>>> ShardedTensor(rank0: (tensor, size, offset), rank1: (tensor, size, offset))

# If t1 is placed on rank 1, rank 0's state dict will still see the ShardedTensor
# but it's local shards will be empty.

An error occurred while building with CMake.

Failed to buid torchrec from source ...

My ENV:

โžœ  ~ lsb_release -a
No LSB modules are available.
Distributor ID:	Ubuntu
Description:	Ubuntu 20.04.4 LTS
Release:	20.04
Codename:	focal
โžœ  ~ uname -r
5.13.0-30-generic
โžœ  ~ python --version
Python 3.8.10
โžœ  ~ gcc --version
gcc (GCC) 10.3.1 20210627
Copyright (C) 2020 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

โžœ  ~ pip --version
pip 22.0.3 from ~/.local/lib/python3.8/site-packages/pip (python 3.8)
โžœ  ~ 
โžœ  torchrec git:(main) โœ— python setup.py build
['setup.py', 'build']
args:  Namespace(TORCH_CUDA_ARCH_LIST='7.5', cpu_only=False, package_name='torchrec', skip_fbgemm=False)
unknown:  ['build']
name:  torchrec
-- torchrec building version: 0.1.0
Installing fbgemm_gpu
TORCH_CUDA_ARCH_LIST:  7.5
cpu_only: False
-------------------------------------------------------------
The project is built using scikit-build
-------------------------------------------------------------
  File "~/.local/lib/python3.8/site-packages/skbuild/setuptools_wrap.py", line 595, in setup
    cmkr.make(make_args, env=env)
  File "~/.local/lib/python3.8/site-packages/skbuild/cmaker.py", line 609, in make
    raise SKBuildError(
An error occurred while building with CMake.
  Command:
    cmake --build . --target install --config Release --
  Source directory:
    ....../torchrec/third_party/fbgemm/fbgemm_gpu
  Working directory:
    ....../torchrec/third_party/fbgemm/fbgemm_gpu/_skbuild/linux-x86_64-3.8/cmake-build
Please see CMake's output for more information.
Traceback (most recent call last):
  File "setup.py", line 166, in <module>
    main(sys.argv[1:])
  File "setup.py", line 110, in main
    out = check_output(
  File "/usr/lib/python3.8/subprocess.py", line 415, in check_output
    return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
  File "/usr/lib/python3.8/subprocess.py", line 516, in run
    raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['/usr/bin/python', 'setup.py', 'build', '-DTORCH_CUDA_ARCH_LIST=7.5']' returned non-zero exit status 1.

How to process the Criteo Kaggle Ad/ Criteo Terabyte dataset in order to run the example?

I am trying to run the example with Criteo Kaggle Ad/ Criteo Terabyte dataset, It seems that the processed binary files are needed if I want to use the InMemoryBinaryCriteoIterDataPipe dataloader.

I am wondering how to process the .txt file of Criteo Kaggle Ad/ Criteo Terabyte in order to run the example? Or could you provide the code for data process?

Loading pretrained Embeddings into an EmbeddingBagCollection

Hello! I am curious if there are best practices for loading in embeddings that need to be sharded across multiple GPUs?

As an example

  • I have a large corpus user embeddings published by another system
  • I'd like these to be loaded once and sharded across GPUs for training

Thank you for any guidance!

Import fails (fbgemm_gpu_py.so is required)

Hardware

  • Macbook with M1Max

Steps to reproduce

  • Install Conda (4.12.0)
  • Create and activate conda environment with Python 3.9
  • Install CPU version of the library
    • conda install pytorch cpuonly -c pytorch-nightly
    • pip install torchrec-nightly-cpu
  • Run import torchrec in a Python console

Error

Python 3.9.12 (main, Apr  5 2022, 01:53:17) 
[Clang 12.0.0 ] :: Anaconda, Inc. on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import torch
>>> import torchrec
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/miniconda3/envs/torchrec_test/lib/python3.9/site-packages/torchrec/__init__.py", line 8, in <module>
    import torchrec.distributed  # noqa
  File "/opt/miniconda3/envs/torchrec_test/lib/python3.9/site-packages/torchrec/distributed/__init__.py", line 39, in <module>
    from torchrec.distributed.model_parallel import DistributedModelParallel  # noqa
  File "/opt/miniconda3/envs/torchrec_test/lib/python3.9/site-packages/torchrec/distributed/model_parallel.py", line 19, in <module>
    from torchrec.distributed.embeddingbag import (
  File "/opt/miniconda3/envs/torchrec_test/lib/python3.9/site-packages/torchrec/distributed/embeddingbag.py", line 27, in <module>
    from torchrec.distributed.embedding_sharding import (
  File "/opt/miniconda3/envs/torchrec_test/lib/python3.9/site-packages/torchrec/distributed/embedding_sharding.py", line 14, in <module>
    from torchrec.distributed.dist_data import (
  File "/opt/miniconda3/envs/torchrec_test/lib/python3.9/site-packages/torchrec/distributed/dist_data.py", line 15, in <module>
    from torchrec.distributed.comm_ops import (
  File "/opt/miniconda3/envs/torchrec_test/lib/python3.9/site-packages/torchrec/distributed/comm_ops.py", line 27, in <module>
    import fbgemm_gpu  # @manual # noqa
  File "/opt/miniconda3/envs/torchrec_test/lib/python3.9/site-packages/fbgemm_gpu/__init__.py", line 12, in <module>
    torch.ops.load_library(os.path.join(os.path.dirname(__file__), "fbgemm_gpu_py.so"))
  File "/opt/miniconda3/envs/torchrec_test/lib/python3.9/site-packages/torch/_ops.py", line 244, in load_library
    ctypes.CDLL(path)
  File "/opt/miniconda3/envs/torchrec_test/lib/python3.9/ctypes/__init__.py", line 382, in __init__
    self._handle = _dlopen(self._name, mode)
OSError: dlopen(/opt/miniconda3/envs/torchrec_test/lib/python3.9/site-packages/fbgemm_gpu/fbgemm_gpu_py.so, 0x0006): tried: '/opt/miniconda3/envs/torchrec_test/lib/python3.9/site-packages/fbgemm_gpu/fbgemm_gpu_py.so' (not a mach-o file)

fbgemm_gpu_py.so not found

Hi team, I'm trying to use torchrec-nightly with torch 1.12 and CUDA 11.2. But when I import torchrec, I get the following:

>>> import torchrec
File fbgemm_gpu_py.so not found

A similar issue was reported on the DLRM issue tracker facebookresearch/dlrm#256. Any ideas?

RuntimeError: table-wise sharding on a single EmbeddingBag is not supported yet

I saw this error message when I was trying the EmbeddingBagSharder. I read the code and found table-wise sharding is not supported in ShardedEmbeddingBag. I believe this is a bug because although table-wise sharding is not supported, the planner still generates a plan to use table-wise sharding. I found the cause for this bug might be when EmbeddingBagSharder inherits from BaseEmbeddingSharder, it doesn't override the sharding_types() function. And table-wise sharding is supported in BaseEmbeddingSharder.sharding_types(). A possible fix is to override EmbeddingBagSharder.sharding_types() function and exclude table-wise sharding. This is my first time submitting an issue to torchrec. I would appreciate any help to confirm this case.

You can find the colab here for the code to reproduce this error.

OSError: undefined symbol: _ZN2at6detail20computeStorageNbytesEN3c108ArrayRefIlEES3_mm

Hi,

I followed all the way down to set up torchrec environment but came across an OSError related to fbgemm:

Traceback (most recent call last):
  File "test_installation.py", line 16, in <module>
    from torchrec import EmbeddingBagCollection, KeyedJaggedTensor
  File "/home/shenghao/torchrec/torchrec/__init__.py", line 8, in <module>
    import torchrec.distributed  # noqa
  File "/home/shenghao/torchrec/torchrec/distributed/__init__.py", line 36, in <module>
    from torchrec.distributed.model_parallel import DistributedModelParallel  # noqa
  File "/home/shenghao/torchrec/torchrec/distributed/model_parallel.py", line 20, in <module>
    from torchrec.distributed.embedding import EmbeddingCollectionSharder
  File "/home/shenghao/torchrec/torchrec/distributed/embedding.py", line 17, in <module>
    from torchrec.distributed.embedding_sharding import (
  File "/home/shenghao/torchrec/torchrec/distributed/embedding_sharding.py", line 15, in <module>
    from torchrec.distributed.dist_data import (
  File "/home/shenghao/torchrec/torchrec/distributed/dist_data.py", line 15, in <module>
    from torchrec.distributed.comm_ops import (
  File "/home/shenghao/torchrec/torchrec/distributed/comm_ops.py", line 27, in <module>
    import fbgemm_gpu  # @manual  # noqa
  File "/home/shenghao/anaconda3/envs/torchrec/lib/python3.8/site-packages/fbgemm_gpu/__init__.py", line 12, in <module>
    torch.ops.load_library(os.path.join(os.path.dirname(__file__), "fbgemm_gpu_py.so"))
  File "/home/shenghao/anaconda3/envs/torchrec/lib/python3.8/site-packages/torch/_ops.py", line 220, in load_library
    ctypes.CDLL(path)
  File "/home/shenghao/anaconda3/envs/torchrec/lib/python3.8/ctypes/__init__.py", line 373, in __init__
    self._handle = _dlopen(self._name, mode)
OSError: /home/shenghao/anaconda3/envs/torchrec/lib/python3.8/site-packages/fbgemm_gpu/fbgemm_gpu_py.so: undefined symbol: _ZN2at6detail20computeStorageNbytesEN3c108ArrayRefIlEES3_mm

That happened when I tried to run test_installation.py.
In addition, my CUDA version is 11.6, so I changed all CUDA path-related settings from 11.3 to 11.6.

NCCL runtime error using DistributedModelParallel

Hello,

I'm trying to train a TorchRec model in a single node with two Nvidia V100 GPUs.

(base) jupyter@ctr-model-gpu:~$ nvidia-smi
Wed May 11 23:22:16 2022       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 470.57.02    Driver Version: 470.57.02    CUDA Version: 11.4     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|===============================+======================+======================|
|   0  Tesla V100-SXM2...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   37C    P0    48W / 300W |      0MiB / 16160MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
|   1  Tesla V100-SXM2...  Off  | 00000000:00:05.0 Off |                    0 |
| N/A   37C    P0    49W / 300W |      0MiB / 16160MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Processes:                                                                  |
|  GPU   GI   CI        PID   Type   Process name                  GPU Memory |
|        ID   ID                                                   Usage      |
|=============================================================================|
|  No running processes found                                                 |
+-----------------------------------------------------------------------------+
(base) jupyter@ctr-model-gpu:~$ python -c "import torch; print(torch.version.cuda)"
11.3

My TorchRec version:

(base) jupyter@ctr-model-gpu:~/hft/ctr-model$ pip freeze | grep torchrec
torchrec==0.1.0

Below's the script I'm trying to run. I've replaced the model with a very simple EBC to discard issues on my model's architecture.

import json
import os

import torch
import torchrec
import torch.distributed as dist
import torch.multiprocessing as mp
from sklearn.metrics import roc_auc_score
from torch.utils.data import DataLoader
from torchrec.distributed import DistributedModelParallel as DMP
from tqdm import tqdm

import datasets
import models
from config_parser import ConfigParser


def main(rank: int, config: ConfigParser) -> None:
    # initalize process group
    dist.init_process_group(                                   
    	backend="nccl",                                         
   		init_method="env://",
    	world_size=2,
    	rank=rank
    )    
    # model
    model = torchrec.EmbeddingBagCollection(
        device=torch.device("meta"),
        tables=[
            torchrec.EmbeddingBagConfig(
                name="product_table",
                embedding_dim=64,
                num_embeddings=4096,
                feature_names=["product"],
                pooling=torchrec.PoolingType.SUM,
            )
        ]
    )
    dmp_model = DMP(
        # module=config.init_object("model", models).to(torch.device("meta")),
        module=model,
        device=torch.device(f"cuda:{rank}")
    )

if __name__ == "__main__":
    # instantiate config parser
    config = ConfigParser()
    
    # distributed config
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29500"
    mp.spawn(main, nprocs=2, args=(config,))

When trying to run this code, I get the following error. I used export NCCL_DEBUG=INFO to get NCCL's logs.

Traceback (most recent call last):
  File "main_dist.py", line 165, in <module>
    mp.spawn(main, nprocs=2, args=(config,))
  File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 240, in spawn
    return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
  File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 198, in start_processes
    while not context.join():
  File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 160, in join
    raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException: 

-- Process 0 terminated with the following error:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrap
    fn(i, *args)
  File "/home/jupyter/hft/ctr-model/main_dist.py", line 29, in main
    module=model,
UnboundLocalError: local variable 'model' referenced before assignment

(base) jupyter@ctr-model-gpu:~/hft/ctr-model$ python main_dist.py 
Log directory 'runs/test-torchrec-dist' already exists. Overwrite? [y / n] y
ctr-model-gpu:30732:30732 [0] NCCL INFO Bootstrap : Using ens6:10.138.0.12<0>
ctr-model-gpu:30732:30732 [0] NCCL INFO NET/Plugin: Failed to find ncclCollNetPlugin_v4 symbol.
ctr-model-gpu:30732:30732 [0] NCCL INFO NET/FastSocket : Tx CPU start: -2
ctr-model-gpu:30732:30732 [0] NCCL INFO NET/FastSocket : Rx CPU start: -2
ctr-model-gpu:30732:30732 [0] NCCL INFO NET/FastSocket : Flow placement enabled.
ctr-model-gpu:30732:30732 [0] NCCL INFO NET/FastSocket : queue skip: 0
ctr-model-gpu:30732:30732 [0] NCCL INFO NET/FastSocket : Using [0]ens6:10.138.0.12<0>
ctr-model-gpu:30732:30732 [0] NCCL INFO NET/FastSocket plugin initialized
ctr-model-gpu:30732:30732 [0] NCCL INFO Using network FastSocket
NCCL version 2.10.3+cuda11.3
ctr-model-gpu:30733:30733 [0] NCCL INFO Bootstrap : Using ens6:10.138.0.12<0>
ctr-model-gpu:30733:30733 [0] NCCL INFO NET/Plugin: Failed to find ncclCollNetPlugin_v4 symbol.
ctr-model-gpu:30733:30733 [0] NCCL INFO NET/FastSocket : Tx CPU start: -2
ctr-model-gpu:30733:30733 [0] NCCL INFO NET/FastSocket : Rx CPU start: -2
ctr-model-gpu:30733:30733 [0] NCCL INFO NET/FastSocket : Flow placement enabled.
ctr-model-gpu:30733:30733 [0] NCCL INFO NET/FastSocket : queue skip: 0
ctr-model-gpu:30733:30733 [0] NCCL INFO NET/FastSocket : Using [0]ens6:10.138.0.12<0>
ctr-model-gpu:30733:30733 [0] NCCL INFO NET/FastSocket plugin initialized
ctr-model-gpu:30733:30733 [0] NCCL INFO Using network FastSocket


ctr-model-gpu:30733:30847 [0] init.cc:521 NCCL WARN Duplicate GPU detected : rank 1 and rank 0 both on CUDA device 40
ctr-model-gpu:30732:30846 [0] init.cc:521 NCCL WARN Duplicate GPU detected : rank 0 and rank 1 both on CUDA device 40
ctr-model-gpu:30733:30847 [0] NCCL INFO init.cc:904 -> 5
ctr-model-gpu:30732:30846 [0] NCCL INFO init.cc:904 -> 5
ctr-model-gpu:30733:30847 [0] NCCL INFO group.cc:72 -> 5 [Async thread]
ctr-model-gpu:30732:30846 [0] NCCL INFO group.cc:72 -> 5 [Async thread]
Traceback (most recent call last):
  File "main_dist.py", line 166, in <module>
    mp.spawn(main, nprocs=2, args=(config,))
  File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 240, in spawn
    return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
  File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 198, in start_processes
    while not context.join():
  File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 160, in join
    raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException: 

-- Process 0 terminated with the following error:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrap
    fn(i, *args)
  File "/home/jupyter/hft/ctr-model/main_dist.py", line 42, in main
    device=torch.device(f"cuda:{rank}")
  File "/opt/conda/lib/python3.7/site-packages/torchrec/distributed/model_parallel.py", line 187, in __init__
    plan = planner.collective_plan(module, sharders, pg)
  File "/opt/conda/lib/python3.7/site-packages/torchrec/distributed/planner/planners.py", line 188, in collective_plan
    sharders,
  File "/opt/conda/lib/python3.7/site-packages/torchrec/distributed/collective_utils.py", line 60, in invoke_on_rank_and_broadcast_result
    dist.broadcast_object_list(object_list, rank, group=pg)
  File "/opt/conda/lib/python3.7/site-packages/torch/distributed/distributed_c10d.py", line 1869, in broadcast_object_list
    broadcast(object_sizes_tensor, src=src, group=group)
  File "/opt/conda/lib/python3.7/site-packages/torch/distributed/distributed_c10d.py", line 1187, in broadcast
    work = default_pg.broadcast([tensor], opts)
RuntimeError: NCCL error in: /opt/conda/conda-bld/pytorch_1646755953518/work/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp:1169, invalid usage, NCCL version 21.0.3
ncclInvalidUsage: This usually reflects invalid usage of NCCL library (such as too many async ops, too many collectives at once, mixing streams in a group, etc).

Reading the NCCL logs, I noticed this two lines:

ctr-model-gpu:30733:30847 [0] init.cc:521 NCCL WARN Duplicate GPU detected : rank 1 and rank 0 both on CUDA device 40
ctr-model-gpu:30732:30846 [0] init.cc:521 NCCL WARN Duplicate GPU detected : rank 0 and rank 1 both on CUDA device 40

Could this be the cause of the issue? If so, how do I solve it? I ran another regular PyTorch script with DistributedDataParallel and didn't have any issues with NCCL; the script ran fine.

Retrieval task and bruteForce prediction

Hello,

I am wondering how to migrate my model from tensorflow_recommenders to pytorch torchRec, but I couldn't find the equivalent of the Retrival tasks and bruteForce predictions in the documentation.

Are there functions created for that ? Or one should code them from scratch ?

If it already exists and i didn't see it, sorry for that.

Thank you for your help :)

Multi-GPUs: CUDA error: initialization error

Hi, thanks for your awesome work! The lib is amazing and I am really into this project.
However, I met some problem when I want to run the example training DLRM model.
Basically I prepared the conda env as readme and it goes well.

conda create -n torchrec python=3.8
conda activate torchrec
conda install pytorch cudatoolkit=11.3 -c pytorch-nightly
pip install torchrec-nightly
git clone https://github.com/pytorch/torchrec.git
cd torchrec/examples/dlrm

Then I use command lines provided by yours to run DLRM.

torchx run -s local_cwd dist.ddp -j 1x2 --script dlrm_main.py

Then errors occurred as follows:

dlrm_main/0 [1]:frame #58: <unknown function> + 0x17cd15 (0x55ad313f2d15 in /home/jingqi/anaconda3/envs/rec/bin/python)
dlrm_main/0 [1]:frame #59: _PyEval_EvalFrameDefault + 0x4c0 (0x55ad3144ce60 in /home/jingqi/anaconda3/envs/rec/bin/python)
dlrm_main/0 [1]:frame #60: _PyEval_EvalCodeWithName + 0x260 (0x55ad31442600 in /home/jingqi/anaconda3/envs/rec/bin/python)
dlrm_main/0 [0]:
dlrm_main/0 [0]:Evaluating test set: 0it [00:00, ?it/s][1]:frame #61: _PyFunction_Vectorcall + 0x534 (0x55ad31443b64 in /home/jingqi/anaconda3/envs/rec/bin/python)
dlrm_main/0 [1]:frame #62: <unknown function> + 0x166b2e (0x55ad313dcb2e in /home/jingqi/anaconda3/envs/rec/bin/python)
dlrm_main/0 [1]:frame #63: _PyEval_EvalFrameDefault + 0x4f83 (0x55ad31451923 in /home/jingqi/anaconda3/envs/rec/bin/python)
dlrm_main/0 [1]:
dlrm_main/0 [1]:terminate called after throwing an instance of 'c10::CUDAError'
dlrm_main/0 [1]:  what():  CUDA error: initialization error
dlrm_main/0 [1]:CUDA kernel errors might be asynchronously reported at some other API call,so the stacktrace below might be incorrect.
dlrm_main/0 [1]:For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
dlrm_main/0 [1]:Exception raised from insert_events at /opt/conda/conda-bld/pytorch_1649229055957/work/c10/cuda/CUDACachingAllocator.cpp:1420 (most recent call first):
dlrm_main/0 [1]:frame #0: c10::Error::Error(c10::SourceLocation, std::string) + 0x4d (0x7f7c9179734d in /home/jingqi/anaconda3/envs/rec/lib/python3.8/site-packages/torch/lib/libc10.so)
dlrm_main/0 [1]:frame #1: <unknown function> + 0x1e50b (0x7f7cbef7b50b in /home/jingqi/anaconda3/envs/rec/lib/python3.8/site-packages/torch/lib/libc10_cuda.so)
dlrm_main/0 [1]:frame #2: c10::cuda::CUDACachingAllocator::raw_delete(void*) + 0x23a (0x7f7cbef8049a in /home/jingqi/anaconda3/envs/rec/lib/python3.8/site-packages/torch/lib/libc10_cuda.so)
dlrm_main/0 [1]:frame #3: <unknown function> + 0x2f82e8 (0x7f7ccffa62e8 in /home/jingqi/anaconda3/envs/rec/lib/python3.8/site-packages/torch/lib/libtorch_python.so)
dlrm_main/0 [1]:frame #4: c10::TensorImpl::release_resources() + 0x175 (0x7f7c9177da15 in /home/jingqi/anaconda3/envs/rec/lib/python3.8/site-packages/torch/lib/libc10.so)
dlrm_main/0 [1]:frame #5: <unknown function> + 0x1e53e9 (0x7f7ccfe933e9 in /home/jingqi/anaconda3/envs/rec/lib/python3.8/site-packages/torch/lib/libtorch_python.so)
dlrm_main/0 [1]:frame #6: <unknown function> + 0x4d6e7c (0x7f7cd0184e7c in /home/jingqi/anaconda3/envs/rec/lib/python3.8/site-packages/torch/lib/libtorch_python.so)

Screenshot 2022-04-07 at 12 21 55 AM

My env is 4 gpus with each has 23GB GPU memory and the server memory is 256G.
I tried to decrease the batch size but it still went wrong so I guess the error is not from memory space limit.

Do you have any ideas or suggestions for better use?

How to run sharded DLRM inference on multi-process CPU environment?

Hi, I appriciate your work on this project.

I want to test simple sharded DLRM inference on CPU environment.
But I'm not sure how to run inference on multi-process.

Setup commands are

conda create -n tochrec python=3.9
conda activate torchrec
conda install pytorch cpuonly -c pytorch-nightly
pip install torchrec-nightly-cpu

First, I'm wondering how to seperate master and worker processes.
I want the master to send an inference request to the worker and the worker to wait an request.
Then, the worker and the master lookup allocated shards and the master gathers output.
Could you give me some example code to seperate master and worker processes?

Second, errors occurred when inferencing CW and RW shards using multi-process. Is there something wrong with my code or command?

Error message on CW sharding:

============================================================
dlrm_sharding.py FAILED
------------------------------------------------------------
Failures:
[1]:
  time      : 2022-04-17_08:47:12
  host      : node10
  rank      : 1 (local_rank: 1)
  exitcode  : 1 (pid: 277843)
  error_file: /tmp/torchelastic__lprh9al/none_qcpp0gvj/attempt_0/1/error.json
  traceback : Traceback (most recent call last):
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/distributed/elastic/multiprocessing/errors/__init__.py", line 345, in wrapper
      return f(*args, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/autograd/grad_mode.py", line 27, in decorate_context
      return func(*args, **kwargs)
    File "/home/jiin/2022/torchrec/dlrm_sharding.py", line 152, in main
      logits = model.forward(
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/model_parallel.py", line 248, in forward
      return self._dmp_wrapped_module(*args, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/parallel/distributed.py", line 989, in forward
      output = self.module(*inputs, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/models/dlrm.py", line 398, in forward
      embedded_sparse = self.sparse_arch(sparse_features)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/models/dlrm.py", line 101, in forward
      sparse: Dict[str, torch.Tensor] = sparse_features.to_dict()
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/types.py", line 234, in __getattr__
      res = LazyAwaitable._wait_async(self)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/types.py", line 193, in _wait_async
      obj._result = obj.wait()
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/types.py", line 120, in wait
      ret: W = self._wait_impl()
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/embeddingbag.py", line 213, in _wait_impl
      embeddings = [w.wait() for w in self._awaitables]
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/embeddingbag.py", line 213, in <listcomp>
      embeddings = [w.wait() for w in self._awaitables]
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/types.py", line 122, in wait
      ret = callback(ret)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/fbgemm_gpu/permute_pooled_embedding_modules.py", line 66, in forward
      result = torch.ops.fbgemm.permute_pooled_embs_auto_grad(
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/_ops.py", line 191, in __getattr__
      op = torch._C._jit_get_operation(qualified_op_name)
  RuntimeError: No such operator fbgemm::permute_pooled_embs_auto_grad
  
------------------------------------------------------------
Root Cause (first observed failure):
[0]:
  time      : 2022-04-17_08:47:12
  host      : node10
  rank      : 0 (local_rank: 0)
  exitcode  : 1 (pid: 277842)
  error_file: /tmp/torchelastic__lprh9al/none_qcpp0gvj/attempt_0/0/error.json
  traceback : Traceback (most recent call last):
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/distributed/elastic/multiprocessing/errors/__init__.py", line 345, in wrapper
      return f(*args, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/autograd/grad_mode.py", line 27, in decorate_context
      return func(*args, **kwargs)
    File "/home/jiin/2022/torchrec/dlrm_sharding.py", line 152, in main
      logits = model.forward(
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/model_parallel.py", line 248, in forward
      return self._dmp_wrapped_module(*args, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/parallel/distributed.py", line 989, in forward
      output = self.module(*inputs, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/models/dlrm.py", line 398, in forward
      embedded_sparse = self.sparse_arch(sparse_features)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/models/dlrm.py", line 101, in forward
      sparse: Dict[str, torch.Tensor] = sparse_features.to_dict()
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/types.py", line 234, in __getattr__
      res = LazyAwaitable._wait_async(self)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/types.py", line 193, in _wait_async
      obj._result = obj.wait()
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/types.py", line 120, in wait
      ret: W = self._wait_impl()
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/embeddingbag.py", line 213, in _wait_impl
      embeddings = [w.wait() for w in self._awaitables]
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/embeddingbag.py", line 213, in <listcomp>
      embeddings = [w.wait() for w in self._awaitables]
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/types.py", line 122, in wait
      ret = callback(ret)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/fbgemm_gpu/permute_pooled_embedding_modules.py", line 66, in forward
      result = torch.ops.fbgemm.permute_pooled_embs_auto_grad(
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/_ops.py", line 191, in __getattr__
      op = torch._C._jit_get_operation(qualified_op_name)
  RuntimeError: No such operator fbgemm::permute_pooled_embs_auto_grad
  
============================================================

Error message on RW sharding:

============================================================
dlrm_sharding.py FAILED
------------------------------------------------------------
Failures:
[1]:
  time      : 2022-04-17_08:48:09
  host      : node10
  rank      : 1 (local_rank: 1)
  exitcode  : 1 (pid: 277932)
  error_file: /tmp/torchelastic_jckpsnqi/none_y6iove8k/attempt_0/1/error.json
  traceback : Traceback (most recent call last):
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/distributed/elastic/multiprocessing/errors/__init__.py", line 345, in wrapper
      return f(*args, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/autograd/grad_mode.py", line 27, in decorate_context
      return func(*args, **kwargs)
    File "/home/jiin/2022/torchrec/dlrm_sharding.py", line 152, in main
      logits = model.forward(
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/model_parallel.py", line 248, in forward
      return self._dmp_wrapped_module(*args, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/parallel/distributed.py", line 989, in forward
      output = self.module(*inputs, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/models/dlrm.py", line 398, in forward
      embedded_sparse = self.sparse_arch(sparse_features)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/models/dlrm.py", line 97, in forward
      sparse_features: KeyedTensor = self.embedding_bag_collection(features)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/types.py", line 422, in forward
      return self.compute_and_output_dist(ctx, dist_input)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/embeddingbag.py", line 392, in compute_and_output_dist
      awaitables=[
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/embeddingbag.py", line 393, in <listcomp>
      dist(lookup(features))
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/sharding/rw_sharding.py", line 341, in forward
      return self._dist(local_embs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/dist_data.py", line 792, in forward
      tensor_awaitable = reduce_scatter_pooled(
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/comm_ops.py", line 413, in reduce_scatter_pooled
      ReduceScatter_Req.apply(group, myreq, rsi, *inputs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/comm_ops.py", line 866, in forward
      req = dist.reduce_scatter(output, list(inputs), group=pg, async_op=True)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/distributed/distributed_c10d.py", line 2454, in reduce_scatter
      work = group.reduce_scatter([output], [input_list], opts)
  RuntimeError: ProcessGroupGloo does not support reduce_scatter
  
------------------------------------------------------------
Root Cause (first observed failure):
[0]:
  time      : 2022-04-17_08:48:09
  host      : node10
  rank      : 0 (local_rank: 0)
  exitcode  : 1 (pid: 277931)
  error_file: /tmp/torchelastic_jckpsnqi/none_y6iove8k/attempt_0/0/error.json
  traceback : Traceback (most recent call last):
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/distributed/elastic/multiprocessing/errors/__init__.py", line 345, in wrapper
      return f(*args, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/autograd/grad_mode.py", line 27, in decorate_context
      return func(*args, **kwargs)
    File "/home/jiin/2022/torchrec/dlrm_sharding.py", line 152, in main
      logits = model.forward(
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/model_parallel.py", line 248, in forward
      return self._dmp_wrapped_module(*args, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/parallel/distributed.py", line 989, in forward
      output = self.module(*inputs, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/models/dlrm.py", line 398, in forward
      embedded_sparse = self.sparse_arch(sparse_features)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/models/dlrm.py", line 97, in forward
      sparse_features: KeyedTensor = self.embedding_bag_collection(features)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/types.py", line 422, in forward
      return self.compute_and_output_dist(ctx, dist_input)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/embeddingbag.py", line 392, in compute_and_output_dist
      awaitables=[
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/embeddingbag.py", line 393, in <listcomp>
      dist(lookup(features))
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/sharding/rw_sharding.py", line 341, in forward
      return self._dist(local_embs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1129, in _call_impl
      return forward_call(*input, **kwargs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/dist_data.py", line 792, in forward
      tensor_awaitable = reduce_scatter_pooled(
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/comm_ops.py", line 413, in reduce_scatter_pooled
      ReduceScatter_Req.apply(group, myreq, rsi, *inputs)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torchrec/distributed/comm_ops.py", line 866, in forward
      req = dist.reduce_scatter(output, list(inputs), group=pg, async_op=True)
    File "/home/jiin/anaconda3/envs/torchrec/lib/python3.9/site-packages/torch/distributed/distributed_c10d.py", line 2454, in reduce_scatter
      work = group.reduce_scatter([output], [input_list], opts)
  RuntimeError: ProcessGroupGloo does not support reduce_scatter
  
============================================================

Here are my code and command.

Code:

class RWSharder(EmbeddingBagCollectionSharder):
    def sharding_types(self, compute_device_type: str) -> List[str]:
        return [ShardingType.ROW_WISE.value]

    def compute_kernels(
        self, sharding_type: str, compute_device_type: str
    ) -> List[str]:
        return [EmbeddingComputeKernel.DENSE.value]

class CWSharder(EmbeddingBagCollectionSharder):
    def sharding_types(self, compute_device_type: str) -> List[str]:
        return [ShardingType.COLUMN_WISE.value]

    def compute_kernels(
        self, sharding_type: str, compute_device_type: str
    ) -> List[str]:
        return [EmbeddingComputeKernel.DENSE.value]

@torch.no_grad()
def main():

    dist.init_process_group(
        backend="gloo"
    )

    ebc = torchrec.EmbeddingBagCollection(
        device="cpu",
        tables=[
            torchrec.EmbeddingBagConfig(
                name="product_table",
                embedding_dim=D,
                num_embeddings=E,
                feature_names=["product"],
                pooling=torchrec.PoolingType.SUM,
            ),
            torchrec.EmbeddingBagConfig(
                name="user_table",
                embedding_dim=D,
                num_embeddings=E,
                feature_names=["user"],
                pooling=torchrec.PoolingType.SUM,
            )
        ]
    )

    dlrm = torchrec.models.dlrm.DLRM(
        embedding_bag_collection=ebc,
        dense_in_features=100,
        dense_arch_layer_sizes=[128, D],
        over_arch_layer_sizes=[D, 32, 1],
    )

    sharders = [cast(ModuleSharder[torch.nn.Module], RWSharder())]

    model = torchrec.distributed.DistributedModelParallel(
                dlrm, 
                device=torch.device("cpu"), 
                sharders=sharders,
            )

    # features
    dense_features = torch.ones((B, 100))
    sparse_features = torchrec.KeyedJaggedTensor(
        keys = ["product", "user"],
        values = torch.tensor([101, 202, 303, 404, 505, 606, 707]),
        lengths = torch.tensor([2, 1, 1, 1, 1, 1]),
    )

    logits = model.forward(
        dense_features=dense_features,
        sparse_features=sparse_features,
    )
    print(logits)

if __name__ == "__main__":
    main()

Command:

torchrun --nproc_per_node 2 dlrm_sharding.py

[Feature Request]: Support UVM for embedding

The feature, motivation and pitch

Embedding tables can be very large and requires a lot of GPU to store the embedding. Although with high speed interconnect, it is still the fastest way, but the cost is high. Offloading large embedding to CPU is a cost effective way to run large models. In addition, we can take advantage of long tail distribution of input data. Only offload infrequently accessed embeddings to CPU to minimize the performance impact.
UVM provides all the necessary functionalities. With application managed access count of embedding rows and UVM hints (cuMemAdvise), it can achieve very good performance. We are able to train MLPerf DLRM in 28 minutes on single A100 compare to 4 minutes on a DGXA100 (8xA100) machine.

What will be in the PR if accepted

UVM tensor

A uvm tensor which uses cudaMallocManaged() to allocate memory. It operates like normal GPU tensor from Pytorch's point of view.

UVM hints

Python wrapper over CudaMemPrefetch() and CudaMemAdvise() to set hints to underlining memory of UVM tensor.

Primary pytorch functions definitions

torch::Tensor UvmTensor(at::IntArrayRef shape, int gpu_id);
void CudaMemPrefetch(torch::Tensor uvm_tensor, int64_t offset, int64_t numel, int device);
void CudaMemAdvise(torch::Tensor uvm_tensor, int64_t offset, int64_t numel, std::string advise_str, int device);

PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
  m.def("UvmTensor", &UvmTensor, "", py::arg("shape"), py::arg("gpu_id") = 0);
  m.def(
      "cuda_mem_prefetch", &CudaMemPrefetch, "",
      py::arg("uvm_tensor"), py::arg("offset"), py::arg("numel"), py::arg("device") = 0);
  m.def(
      "cuda_mem_advise", &CudaMemAdvise, "",
      py::arg("uvm_tensor"), py::arg("offset"), py::arg("numel"), py::arg("advise_str"), py::arg("device") = 0);
}

examples

MLPerf DLRM plus other variants.

`torchrec` wheels don't follow the regular PyTorch scheme

I'm the author of a tool that helps installing PyTorch distributions with pip. As such, I'm monitoring the custom indices set up by PyTorch. torchrec seems to go against the conventions of all other packages.

Looking at https://download.pytorch.org/whl/nightly/cpu/ I currently get the following

Screenshot from 2022-05-16 08-23-33

The entry should be just torchrec. Things like the channel (nightly) or the computation backend (cpu, cu113, ...) should be done through the initial link. Note that torch_nightly and torchaudio_nightly are not the scheme to adopt here. If you follow the links, the latest entry is from 2019 and only seems to be there for BC.

Speaking of dates, following the torchrec_nightly link shows

Screenshot from 2022-05-16 08-32-30

The naming and date scheme differs from other packages. For example yesterdays torch nightlies for CPU look like

Screenshot from 2022-05-16 08-35-10

It seems torchrec for now is a pure Python package and thus does not need to be compiled against multiple computation backends and platforms. Thus, the nightly wheel should look like

torchrec-0.1.0.dev20220516-py3-none-any.whl

and hosted for all backends. torchdata has the same setup:

AttributeError: '_OpNamespace' object has no attribute 'permute_2D_sparse_data'

dependencies:
fbgemm-gpu-nightly-cpu==2022.6.28
torchrec==0.2.0
torch==1.12.0

it is a bit hard for me to disentangle code to easily give a repro, but this seems less a problem with code and more with dependencies?

E           raise AttributeError(f"'_OpNamespace' object has no attribute '{op_name}'") from e
E       AttributeError: '_OpNamespace' object has no attribute 'permute_2D_sparse_data'

ValueError: All tables in a EmbeddingCollection are required to have same embedding dimension.

Hello,

I noticed there's a possible bug in the EmbeddingCollection API. The documentation lists this example:

e1_config = EmbeddingConfig(
    name="t1", embedding_dim=2, num_embeddings=10, feature_names=["f1"]
)
e2_config = EmbeddingConfig(
    name="t2", embedding_dim=3, num_embeddings=10, feature_names=["f2"]
)

ec = EmbeddingCollection(tables=[e1_config, e2_config])

#     0       1        2  <-- batch
# 0   [0,1] None    [2]
# 1   [3]    [4]    [5,6,7]
# ^
# feature

features = KeyedJaggedTensor.from_offsets_sync(
    keys=["f1", "f2"],
    values=torch.tensor([0, 1, 2, 3, 4, 5, 6, 7]),
    offsets=torch.tensor([0, 2, 2, 3, 4, 5, 8]),
)
feature_embeddings = ec(features)
print(feature_embeddings['f2'].values())
tensor([[-0.2050,  0.5478,  0.6054],
[ 0.7352,  0.3210, -3.0399],
[ 0.1279, -0.1756, -0.4130],
[ 0.7519, -0.4341, -0.0499],
[ 0.9329, -1.0697, -0.8095]], grad_fn=<EmbeddingBackward>)

However, when trying to run this code, I get the following error:

ValueError: All tables in a EmbeddingCollection are required to have same embedding dimension.

Why does this happen? As I see it, EmbeddingCollection should be able to have different embedding dimensions for each feature.

Criteo dataset availability

Is there another source for the criteo data by day?

The links at http://azuremlsampleexperiments.blob.core.windows.net/criteo/day_0.gz seem unavailable.

Gather Sharded Embedding on multi-GPUs to CPU Memory

I am trying to gather Sharded embeddings to rank 0, while met CUDA OOM. Not sure if I am using the correct method, any suggestion?

def get_user_embeddings(self):
    sd = self.model.state_dict()
    # support multi-GPUs
    user_emb = _collate_sharded_tensor(sd['model.user_embedding.embeddings.t_users.weight'], (self.num_users, self.factors))

def _collate_sharded_tensor(sharded_tensor, size):
    rank = int(os.environ["LOCAL_RANK"])
    out_tensor = torch.empty(size, device=torch.device("cpu")) if rank == 0 else None
    sharded_tensor.gather(0, out_tensor)
    return out_tensor
2022-05-26 14:54:30 Traceback (most recent call last):
2022-05-26 14:54:30   File "main_torchrec.py", line 431, in <module>
2022-05-26 14:54:30     main()
2022-05-26 14:54:30   File "main_torchrec.py", line 329, in main
2022-05-26 14:54:30     train(args, train_pipeline, train_dataloader, rank, world_size, evaluator)
2022-05-26 14:54:30   File "main_torchrec.py", line 138, in train
2022-05-26 14:54:30     item_emb = evaluator.model.get_item_embeddings()
2022-05-26 14:54:30   File "/tmp/src/torchrec_cf/utils.py", line 54, in get_item_embeddings
2022-05-26 14:54:30     item_emb = _collate_sharded_tensor(sd['model.item_embedding.embeddings.t_items.weight'], (self.num_items, self.factors))
2022-05-26 14:54:30   File "/tmp/src/torchrec_cf/utils.py", line 64, in _collate_sharded_tensor
2022-05-26 14:54:30     sharded_tensor.gather(0, out_tensor)
2022-05-26 14:54:30   File "/usr/local/lib/python3.8/dist-packages/torch/distributed/_shard/sharded_tensor/api.py", line 285, in gather
2022-05-26 14:54:30     dist.all_gather_object(
2022-05-26 14:54:30   File "/usr/local/lib/python3.8/dist-packages/torch/distributed/distributed_c10d.py", line 1668, in all_gather_object
2022-05-26 14:54:30     object_list[i] = _tensor_to_object(tensor, tensor_size)
2022-05-26 14:54:30   File "/usr/local/lib/python3.8/dist-packages/torch/distributed/distributed_c10d.py", line 1565, in _tensor_to_object
2022-05-26 14:54:30     return _unpickler(io.BytesIO(buf)).load()
2022-05-26 14:54:30   File "/usr/local/lib/python3.8/dist-packages/torch/storage.py", line 181, in _load_from_bytes
2022-05-26 14:54:30     return torch.load(io.BytesIO(b))
2022-05-26 14:54:30   File "/usr/local/lib/python3.8/dist-packages/torch/serialization.py", line 713, in load
2022-05-26 14:54:30     return _legacy_load(opened_file, map_location, pickle_module, **pickle_load_args)
2022-05-26 14:54:30   File "/usr/local/lib/python3.8/dist-packages/torch/serialization.py", line 930, in _legacy_load
2022-05-26 14:54:30     result = unpickler.load()
2022-05-26 14:54:30   File "/usr/local/lib/python3.8/dist-packages/torch/serialization.py", line 876, in persistent_load
2022-05-26 14:54:30     wrap_storage=restore_location(obj, location),
2022-05-26 14:54:30   File "/usr/local/lib/python3.8/dist-packages/torch/serialization.py", line 176, in default_restore_location
2022-05-26 14:54:30     result = fn(storage, location)
2022-05-26 14:54:30   File "/usr/local/lib/python3.8/dist-packages/torch/serialization.py", line 156, in _cuda_deserialize
2022-05-26 14:54:30     return storage_type(obj.nbytes())
2022-05-26 14:54:30   File "/usr/local/lib/python3.8/dist-packages/torch/cuda/__init__.py", line 661, in _lazy_new
2022-05-26 14:54:30     return super(_CudaBase, cls).__new__(cls, *args, **kwargs)
2022-05-26 14:54:30 RuntimeError: CUDA out of memory. Tried to allocate 1.96 GiB (GPU 0; 79.35 GiB total capacity; 0 bytes already allocated; 718.19 MiB free; 0 bytes reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF
2022-05-26 14:54:30 

RuntimeError: No such operator fbgemm::jagged_to_padded_dense error on google colab

Splitting
<pandas.core.groupby.generic.DataFrameGroupBy object at 0x7fbb025db450>
Sampling negative items

RuntimeError Traceback (most recent call last)
/usr/local/lib/python3.7/dist-packages/torch/_ops.py in getattr(self, op_name)
197 try:
--> 198 op, overload_names = torch._C._jit_get_operation(qualified_op_name)
199 except RuntimeError as e:

RuntimeError: No such operator fbgemm::jagged_to_padded_dense

The above exception was the direct cause of the following exception:

AttributeError Traceback (most recent call last)
13 frames
/usr/local/lib/python3.7/dist-packages/torch/_ops.py in getattr(self, op_name)
200 # Turn this into AttributeError so getattr(obj, key, default)
201 # works (this is called by TorchScript with origin)
--> 202 raise AttributeError(f"'_OpNamespace' object has no attribute '{op_name}'") from e
203
204 # let the script frontend know that op is identical to the builtin op

AttributeError: '_OpNamespace' object has no attribute 'jagged_to_padded_dense'

Multi-GPUs: Partial Ranks Hang for a long time after other ranks finished

Hi All,

I am running a multi-GPU job on a single node. The job will always hang at final epoch. 3 out of 8 ranks will hang for a long time util time out and throw below exception. Looks like finished ranks just exit and release all the resource while the rest ranks are waiting for something. Any idea why?

Epoch 9, Rank 7, world_size 8: 35batch [00:03, 10.54batch/s]
Epoch 9, Rank 2, world_size 8: 35batch [00:03, 10.54batch/s]
Epoch 9, Rank 5, world_size 8: 35batch [00:03, 10.51batch/s]
Epoch 9, Rank 0, world_size 8: 35batch [00:03, 10.53batch/s]
Epoch 9, Rank 3, world_size 8: 35batch [00:03, 10.47batch/s]
Epoch 9, Rank 6, world_size 8: 16batch [00:17, 11.32batch/s][E ProcessGroupNCCL.cpp:717] [Rank 4] Watchdog caught collective operation timeout: WorkNCCL(SeqNum=4190, OpType=ALLTOALL_BASE, Timeout(ms)=1800000) ran for 1803657 milliseconds before timing out.
[E ProcessGroupNCCL.cpp:717] [Rank 1] Watchdog caught collective operation timeout: WorkNCCL(SeqNum=4188, OpType=ALLTOALL_BASE, Timeout(ms)=1800000) ran for 1803676 milliseconds before timing out.
Epoch 9, Rank 4, world_size 8: 26batch [30:06, 69.48s/batch]
Traceback (most recent call last):
  File "/GitSource/collaborative_filtering/src/main_torchrec.py", line 300, in <module>
    main()
  File "/GitSource/collaborative_filtering/src/main_torchrec.py", line 253, in main
    train(args, train_pipeline, train_dataloader, rank, world_size)
  File "/GitSource/collaborative_filtering/src/main_torchrec.py", line 103, in train
    _train(train_pipeline, train_iterator, next_iterator, epoch, rank, world_size)
  File "/GitSource/collaborative_filtering/src/main_torchrec.py", line 86, in _train
    train_pipeline.progress(combined_iterator)
  File "/GitSource/collaborative_filtering/src/torchrec_pipeline.py", line 69, in progress
    (losses, output) = cast(Tuple[torch.Tensor, Out], self._model(batch_i))
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1111, in _call_impl
    return forward_call(*input, **kwargs)
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/model_parallel.py", line 245, in forward
    return self._dmp_wrapped_module(*args, **kwargs)
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1111, in _call_impl
    return forward_call(*input, **kwargs)
  File "/GitSource/collaborative_filtering/src/torchrec_model.py", line 240, in forward
    loss = self.model.contrastive_loss(
  File "/GitSource/collaborative_filtering/src/torchrec_model.py", line 153, in contrastive_loss
    pos_items: Tensor = self.item_embedding(pos_features)["items"].values()
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1111, in _call_impl
    return forward_call(*input, **kwargs)
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/types.py", line 415, in forward
    dist_input = self.input_dist(ctx, *input, **kwargs).wait()
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/embedding.py", line 325, in input_dist
    tensor_awaitable = tensor_awaitable.wait()  # finish lengths all2all
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/types.py", line 117, in wait
    ret: W = self._wait_impl()
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/embedding_sharding.py", line 112, in _wait_impl
    id_list_features_awaitable=self._id_list_features_awaitable.wait()
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/types.py", line 117, in wait
    ret: W = self._wait_impl()
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/dist_data.py", line 384, in _wait_impl
    ret = KJTAllToAllIndicesAwaitable(
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/dist_data.py", line 175, in __init__
    self._values_awaitable: dist.Work = dist.all_to_all_single(
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/distributed/distributed_c10d.py", line 2612, in all_to_all_single
    work = group.alltoall_base(
RuntimeError: NCCL communicator was aborted on rank 4.  Original reason for failure was: [Rank 4] Watchdog caught collective operation timeout: WorkNCCL(SeqNum=4190, OpType=ALLTOALL_BASE, Timeout(ms)=1800000) ran for 1803657 milliseconds before timing out.
Epoch 9, Rank 1, world_size 8: 26batch [30:06, 69.49s/batch]
Traceback (most recent call last):
  File "/GitSource/collaborative_filtering/src/main_torchrec.py", line 300, in <module>
    main()
  File "/GitSource/collaborative_filtering/src/main_torchrec.py", line 253, in main
    train(args, train_pipeline, train_dataloader, rank, world_size)
  File "/GitSource/collaborative_filtering/src/main_torchrec.py", line 103, in train
    _train(train_pipeline, train_iterator, next_iterator, epoch, rank, world_size)
  File "/GitSource/collaborative_filtering/src/main_torchrec.py", line 86, in _train
    train_pipeline.progress(combined_iterator)
  File "/GitSource/collaborative_filtering/src/torchrec_pipeline.py", line 69, in progress
    (losses, output) = cast(Tuple[torch.Tensor, Out], self._model(batch_i))
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1111, in _call_impl
    return forward_call(*input, **kwargs)
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/model_parallel.py", line 245, in forward
    return self._dmp_wrapped_module(*args, **kwargs)
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1111, in _call_impl
    return forward_call(*input, **kwargs)
  File "/GitSource/collaborative_filtering/src/torchrec_model.py", line 240, in forward
    loss = self.model.contrastive_loss(
  File "/GitSource/collaborative_filtering/src/torchrec_model.py", line 152, in contrastive_loss
    users: Tensor = self.user_embedding(user_features)["users"].values()
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1111, in _call_impl
    return forward_call(*input, **kwargs)
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/types.py", line 415, in forward
    dist_input = self.input_dist(ctx, *input, **kwargs).wait()
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/embedding.py", line 325, in input_dist
    tensor_awaitable = tensor_awaitable.wait()  # finish lengths all2all
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/types.py", line 117, in wait
    ret: W = self._wait_impl()
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/embedding_sharding.py", line 112, in _wait_impl
    id_list_features_awaitable=self._id_list_features_awaitable.wait()
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/types.py", line 117, in wait
    ret: W = self._wait_impl()
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/dist_data.py", line 384, in _wait_impl
    ret = KJTAllToAllIndicesAwaitable(
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/dist_data.py", line 168, in __init__
    out_values = torch.empty(
TypeError: empty(): argument 'size' must be tuple of ints, but found element of type int at pos 1
[E ProcessGroupNCCL.cpp:717] [Rank 6] Watchdog caught collective operation timeout: WorkNCCL(SeqNum=4190, OpType=ALLTOALL_BASE, Timeout(ms)=1800000) ran for 1804588 milliseconds before timing out.
Epoch 9, Rank 6, world_size 8: 17batch [30:06, 106.25s/batch]
Traceback (most recent call last):
  File "/GitSource/collaborative_filtering/src/main_torchrec.py", line 300, in <module>
    main()
  File "/GitSource/collaborative_filtering/src/main_torchrec.py", line 253, in main
    train(args, train_pipeline, train_dataloader, rank, world_size)
  File "/GitSource/collaborative_filtering/src/main_torchrec.py", line 103, in train
    _train(train_pipeline, train_iterator, next_iterator, epoch, rank, world_size)
  File "/GitSource/collaborative_filtering/src/main_torchrec.py", line 86, in _train
    train_pipeline.progress(combined_iterator)
  File "/GitSource/collaborative_filtering/src/torchrec_pipeline.py", line 69, in progress
    (losses, output) = cast(Tuple[torch.Tensor, Out], self._model(batch_i))
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1111, in _call_impl
    return forward_call(*input, **kwargs)
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/model_parallel.py", line 245, in forward
    return self._dmp_wrapped_module(*args, **kwargs)
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1111, in _call_impl
    return forward_call(*input, **kwargs)
  File "/GitSource/collaborative_filtering/src/torchrec_model.py", line 240, in forward
    loss = self.model.contrastive_loss(
  File "/GitSource/collaborative_filtering/src/torchrec_model.py", line 153, in contrastive_loss
    pos_items: Tensor = self.item_embedding(pos_features)["items"].values()
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1111, in _call_impl
    return forward_call(*input, **kwargs)
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/types.py", line 415, in forward
    dist_input = self.input_dist(ctx, *input, **kwargs).wait()
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/embedding.py", line 325, in input_dist
    tensor_awaitable = tensor_awaitable.wait()  # finish lengths all2all
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/types.py", line 117, in wait
    ret: W = self._wait_impl()
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/embedding_sharding.py", line 112, in _wait_impl
    id_list_features_awaitable=self._id_list_features_awaitable.wait()
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/types.py", line 117, in wait
    ret: W = self._wait_impl()
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/dist_data.py", line 384, in _wait_impl
    ret = KJTAllToAllIndicesAwaitable(
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/distributed/dist_data.py", line 175, in __init__
    self._values_awaitable: dist.Work = dist.all_to_all_single(
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/distributed/distributed_c10d.py", line 2612, in all_to_all_single
    work = group.alltoall_base(
RuntimeError: NCCL communicator was aborted on rank 6.  Original reason for failure was: [Rank 6] Watchdog caught collective operation timeout: WorkNCCL(SeqNum=4190, OpType=ALLTOALL_BASE, Timeout(ms)=1800000) ran for 1804588 milliseconds before timing out.
ERROR:torch.distributed.elastic.multiprocessing.api:failed (exitcode: 1) local_rank: 1 (pid: 2115) of binary: /conda_envs/torchrec_gpu_py39/bin/python
Traceback (most recent call last):
  File "/conda_envs/torchrec_gpu_py39/bin/torchrun", line 33, in <module>
    sys.exit(load_entry_point('torch==1.12.0.dev20220307', 'console_scripts', 'torchrun')())
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/distributed/elastic/multiprocessing/errors/__init__.py", line 345, in wrapper
    return f(*args, **kwargs)
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/distributed/run.py", line 761, in main
    run(args)
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/distributed/run.py", line 752, in run
    elastic_launch(
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/distributed/launcher/api.py", line 131, in __call__
    return launch_agent(self._config, self._entrypoint, list(args))
  File "/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/distributed/launcher/api.py", line 245, in launch_agent
    raise ChildFailedError(
torch.distributed.elastic.multiprocessing.errors.ChildFailedError: 
============================================================
main_torchrec.py FAILED
------------------------------------------------------------
Failures:
[1]:
  time      : 2022-03-30_06:10:10
  host      : 4fdbdf95bfdd4ade81a50fb8ea6f3c8c-master-0
  rank      : 4 (local_rank: 4)
  exitcode  : 1 (pid: 2118)
  error_file: <N/A>
  traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html
[2]:
  time      : 2022-03-30_06:10:10
  host      : 4fdbdf95bfdd4ade81a50fb8ea6f3c8c-master-0
  rank      : 6 (local_rank: 6)
  exitcode  : 1 (pid: 2120)
  error_file: <N/A>
  traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html
------------------------------------------------------------
Root Cause (first observed failure):
[0]:
  time      : 2022-03-30_06:10:10
  host      : 4fdbdf95bfdd4ade81a50fb8ea6f3c8c-master-0
  rank      : 1 (local_rank: 1)
  exitcode  : 1 (pid: 2115)
  error_file: <N/A>
  traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html
============================================================

My code snippet, using DistributedModelParallel and TrainPipelineSparseDist:

    # ProcessGroup
    pg = dist.GroupMember.WORLD
    assert pg is not None, "Process group is not initialized"
    # sharders and planner
    sharding_types = [
        # use ROW_WISE for multi-GPUs
        ShardingType.ROW_WISE.value if dist.get_world_size(pg) > 1 else ShardingType.TABLE_WISE.value,
    ]
    compute_kernels = [
        # EmbeddingComputeKernel.SPARSE.value,
        EmbeddingComputeKernel.BATCHED_FUSED.value,
    ]
    constraints = {
            "t_users": ParameterConstraints(sharding_types=sharding_types, compute_kernels=compute_kernels),
            "t_items": ParameterConstraints(sharding_types=sharding_types, compute_kernels=compute_kernels),
        }

    fused_params = {
        "learning_rate": args.learning_rate,
    }
    sharders = [EmbeddingCollectionSharder(fused_params=fused_params)]

    env = ShardingEnv.from_process_group(pg)
    planner = EmbeddingShardingPlanner(
        topology=Topology(
            world_size=env.world_size, compute_device=device.type,
        ),
        constraints=constraints,
    )
    if pg is not None:
        plan = planner.collective_plan(train_model, sharders, pg)
    else:
        plan = planner.plan(train_model, sharders)

    model = DistributedModelParallel(
        module=train_model,
        sharders=cast(List[ModuleSharder[nn.Module]], sharders),
        plan=plan,
        env=env,
        device=device,
    )
    print("model.plan: ", model.plan)
    print("model: ", model)

    # optimizer = KeyedOptimizerWrapper(
    #     dict(model.named_parameters()),
    #     lambda params: torch.optim.SGD(params, lr=args.learning_rate),
    # )

    train_pipeline = TrainPipelineSparseDist(
        model,
        model.fused_optimizer,
        device,
    )

[Feature request] better overview and examples of smaller scale models

request on behalf of @adamlerer (adding here to be able to reference)

  1. the readme contains no information about how to actually use torchrec, or a problem description of what problem it's trying to solve. It instead gives a list of advanced engineering features that aren't related to the high-level task
    https://github.com/pytorch/torchrec
  2. Next I click to docs, which gives an API description but still no explanation of what problem it's trying to solve or how to actually use it
  3. Next I get to the tutorial, which kind of tells me how to use it. But do I really need to set up distributed torch and stuff for that?

Here's a simple setup to train a recommender system that you can get running in an hour and will give you good results. And don't worry, we've built lots of fancy features so that it will scale up to huge data sizes

Fail to run under CPU mode with error of 'unexpected keyword argument 'static_graph''

Hi,

I tried to run TorchRec/DLRM example on my X86 CPU platform (Cooperlake). My Pytorch is 1.11, and I installed latest TorchRec master from source, but it returns with below errors when I tried to run the DLRM example. Could you help me on this?

Hardware

Cooperlake 8380

Steps to reproduce

Create and activate conda environment with Python 3.9
Install CPU version of the library from latest master

python setup.py install --cpu_only

Run the DLRM example

torchx run -s local_cwd dist.ddp -j 1x2 --script dlrm_main.py

Error

dlrm_main/0 [0]:Traceback (most recent call last):
dlrm_main/0 [1]:Traceback (most recent call last):
dlrm_main/0 [0]: File "/home/neo/workspace/projects/torchrec/torchrec/examples/dlrm/dlrm_main.py", line 550, in
dlrm_main/0 [1]: File "/home/neo/workspace/projects/torchrec/torchrec/examples/dlrm/dlrm_main.py", line 550, in
dlrm_main/0 [0]: main(sys.argv[1:])
dlrm_main/0 [1]: main(sys.argv[1:])
dlrm_main/0 [0]: File "/home/neo/workspace/projects/torchrec/torchrec/examples/dlrm/dlrm_main.py", line 528, in main
dlrm_main/0 [1]: File "/home/neo/workspace/projects/torchrec/torchrec/examples/dlrm/dlrm_main.py", line 528, in main
dlrm_main/0 [0]: model = DistributedModelParallel(
dlrm_main/0 [1]: model = DistributedModelParallel(
dlrm_main/0 [0]: File "/home/neo/anaconda3/lib/python3.9/site-packages/torchrec-0.1.0-py3.9.egg/torchrec/distributed/model_parallel.py", line 224, in init
dlrm_main/0 [1]: File "/home/neo/anaconda3/lib/python3.9/site-packages/torchrec-0.1.0-py3.9.egg/torchrec/distributed/model_parallel.py", line 224, in init
dlrm_main/0 [0]: self.init_data_parallel()
dlrm_main/0 [1]: self.init_data_parallel()
dlrm_main/0 [0]: File "/home/neo/anaconda3/lib/python3.9/site-packages/torchrec-0.1.0-py3.9.egg/torchrec/distributed/model_parallel.py", line 259, in init_data_parallel
dlrm_main/0 [1]: File "/home/neo/anaconda3/lib/python3.9/site-packages/torchrec-0.1.0-py3.9.egg/torchrec/distributed/model_parallel.py", line 259, in init_data_parallel
dlrm_main/0 [0]: self._data_parallel_wrapper.wrap(self, self._env, self.device)
dlrm_main/0 [1]: self._data_parallel_wrapper.wrap(self, self._env, self.device)
dlrm_main/0 [0]: File "/home/neo/anaconda3/lib/python3.9/site-packages/torchrec-0.1.0-py3.9.egg/torchrec/distributed/model_parallel.py", line 104, in wrap
dlrm_main/0 [1]: File "/home/neo/anaconda3/lib/python3.9/site-packages/torchrec-0.1.0-py3.9.egg/torchrec/distributed/model_parallel.py", line 104, in wrap
dlrm_main/0 [0]: DistributedDataParallel(
dlrm_main/0 [1]: DistributedDataParallel(
dlrm_main/0 [0]:TypeError: init() got an unexpected keyword argument 'static_graph'
dlrm_main/0 [1]:TypeError: init() got an unexpected keyword argument 'static_graph'
dlrm_main/0 ERROR:torch.distributed.elastic.multiprocessing.api:failed (exitcode: 1) local_rank: 0 (pid: 7786) of binary: /home/neo/anaconda3/bin/python
dlrm_main/0 Traceback (most recent call last):
dlrm_main/0 File "/home/neo/anaconda3/lib/python3.9/runpy.py", line 197, in _run_module_as_main
dlrm_main/0 return _run_code(code, main_globals, None,
dlrm_main/0 File "/home/neo/anaconda3/lib/python3.9/runpy.py", line 87, in _run_code
dlrm_main/0 exec(code, run_globals)
dlrm_main/0 File "/home/neo/anaconda3/lib/python3.9/site-packages/torch/distributed/run.py", line 728, in
dlrm_main/0 main()
dlrm_main/0 File "/home/neo/anaconda3/lib/python3.9/site-packages/torch/distributed/elastic/multiprocessing/errors/init.py", line 345, in wrapper
dlrm_main/0 return f(*args, **kwargs)
dlrm_main/0 File "/home/neo/anaconda3/lib/python3.9/site-packages/torch/distributed/run.py", line 724, in main
dlrm_main/0 run(args)
dlrm_main/0 File "/home/neo/anaconda3/lib/python3.9/site-packages/torch/distributed/run.py", line 715, in run
dlrm_main/0 elastic_launch(
dlrm_main/0 File "/home/neo/anaconda3/lib/python3.9/site-packages/torch/distributed/launcher/api.py", line 131, in call
dlrm_main/0 return launch_agent(self._config, self._entrypoint, list(args))
dlrm_main/0 File "/home/neo/anaconda3/lib/python3.9/site-packages/torch/distributed/launcher/api.py", line 245, in launch_agent
dlrm_main/0 raise ChildFailedError(
dlrm_main/0 torch.distributed.elastic.multiprocessing.errors.ChildFailedError:
dlrm_main/0 ============================================================
dlrm_main/0 dlrm_main.py FAILED
dlrm_main/0 ------------------------------------------------------------
dlrm_main/0 Failures:
dlrm_main/0 [1]:
dlrm_main/0 time : *****
dlrm_main/0 host : *****
dlrm_main/0 rank : 1 (local_rank: 1)
dlrm_main/0 exitcode : 1 (pid: 7787)
dlrm_main/0 error_file: <N/A>
dlrm_main/0 traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html
dlrm_main/0 ------------------------------------------------------------
dlrm_main/0 Root Cause (first observed failure):
dlrm_main/0 [0]:
dlrm_main/0 time : *****
dlrm_main/0 host : *****
dlrm_main/0 rank : 0 (local_rank: 0)
dlrm_main/0 exitcode : 1 (pid: 7786)
dlrm_main/0 error_file: <N/A>
dlrm_main/0 traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html
dlrm_main/0 ============================================================
torchx 2022-04-14 11:28:54 INFO Job finished: FAILED
torchx 2022-04-14 11:28:54 ERROR AppStatus:

ValueError: optimizer got an empty parameter list

Hi all,

I am trying to run a simple model based on TorchRec, while DistributedModelParallel instance model.named_parameters() returns empty parameter list. Could you please help take a look?

ValueError: optimizer got an empty parameter list

model debug info and exception message:

model:  DistributedModelParallel(
  (_dmp_wrapped_module): CFTorchRecModelTrain(
    (model): CFTorchRecModel(
      (user_embedding): ShardedEmbeddingCollection(
        (_input_dists): ModuleList()
        (_lookups): ModuleList(
          (0): GroupedEmbeddingsLookup(
            (_emb_modules): ModuleList(
              (0): BatchedFusedEmbedding(
                (_emb_module): SplitTableBatchedEmbeddingBagsCodegen()
              )
            )
          )
        )
        (_output_dists): ModuleList(
          (0): RwSequenceEmbeddingDist(
            (_dist): SequenceEmbeddingAllToAll()
          )
        )
      )
      (item_embedding): ShardedEmbeddingCollection(
        (_input_dists): ModuleList()
        (_lookups): ModuleList(
          (0): GroupedEmbeddingsLookup(
            (_emb_modules): ModuleList(
              (0): BatchedFusedEmbedding(
                (_emb_module): SplitTableBatchedEmbeddingBagsCodegen()
              )
            )
          )
        )
        (_output_dists): ModuleList(
          (0): RwSequenceEmbeddingDist(
            (_dist): SequenceEmbeddingAllToAll()
          )
        )
      )
      (cos): CosineSimilarity()
    )
  )
)

model.plan:  {'model.user_embedding': {'t_users': ParameterSharding(sharding_type='row_wise', compute_kernel='batched_fused', ranks=[0], sharding_spec=EnumerableShardingSpec(shards=[ShardMetadata(shard_offsets=[0, 0], shard_sizes=[7000000, 32], placement=rank:0/cuda:0)]))}, 'model.item_embedding': {'t_items': ParameterSharding(sharding_type='row_wise', compute_kernel='batched_fused', ranks=[0], sharding_spec=EnumerableShardingSpec(shards=[ShardMetadata(shard_offsets=[0, 0], shard_sizes=[32941588, 32], placement=rank:0/cuda:0)]))}}

dict: model.named_parameters():  {}

Traceback (most recent call last):
  File "/vc_data/users/GitSource/deeprank/collaborative_filtering/src/main_torchrec.py", line 284, in <module>
    main()
  File "/vc_data/users/GitSource/deeprank/collaborative_filtering/src/main_torchrec.py", line 184, in main
    optimizer = KeyedOptimizerWrapper(
  File "/vc_data/users/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torchrec/optim/keyed.py", line 339, in __init__
    self._optimizer: optim.Optimizer = optim_factory(list(params.values()))
  File "/vc_data/users/GitSource/deeprank/collaborative_filtering/src/main_torchrec.py", line 186, in <lambda>
    lambda params: torch.optim.SGD(params, lr=args.learning_rate),
  File "/vc_data/users/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/optim/sgd.py", line 105, in __init__
    super(SGD, self).__init__(params, defaults)
  File "/vc_data/users/conda_envs/torchrec_gpu_py39/lib/python3.9/site-packages/torch/optim/optimizer.py", line 49, in __init__
    raise ValueError("optimizer got an empty parameter list")
ValueError: optimizer got an empty parameter list

Code Snippet
torchrec_model.py

class CFTorchRecModel(nn.Module):
    """model define"""
    def __init__(
        self,
        num_users: int,
        num_items: int,
        factors: int,
    ) -> None:
        super().__init__()
        user_eb_configs = [
            EmbeddingConfig(
                name="t_users",
                embedding_dim=factors,
                num_embeddings=num_users,
                feature_names=["users"],
            ),
        ]
        item_eb_configs = [
            EmbeddingConfig(
                name="t_items",
                embedding_dim=factors,
                num_embeddings=num_items,
                feature_names=["items"],
            )
        ]
        self.user_embedding: EmbeddingCollection = EmbeddingCollection(
            tables=user_eb_configs, device=torch.device("meta")
        )
        self.item_embedding: EmbeddingCollection = EmbeddingCollection(
            tables=item_eb_configs, device=torch.device("meta")
        )
        self.cos = nn.CosineSimilarity(dim=1, eps=1e-6)

    def forward(self, users: Tensor, items: Tensor) -> torch.Tensor:
        """forward"""
        user_features = KeyedJaggedTensor(
            keys=["users"],
            values=users.reshape(1, -1),
            offsets=torch.tensor([i for i in range(users.shape[0] + 1)]),
        )
        user_embs = self.user_embedding(user_features)["users"].values()

        item_features = KeyedJaggedTensor(
            keys=["items"],
            values=items.reshape(1, -1),
            offsets=torch.tensor([i for i in range(items.shape[0] + 1)]),
        )
        item_embs = self.item_embedding(item_features)["items"].values()

        scores = torch.sum(user_embs * item_embs, dim=1)
        return nn.sigmod(scores)

    def bpr_loss(self, users: Tensor, pos: Tensor, neg: Tensor, cosine_loss_multiplier: float):
        """bpr loss"""

        user_features = KeyedJaggedTensor(
            keys=["users"],
            values=users.reshape(1, -1),
            offsets=users.new_ones(users.shape[0] + 1).cumsum(0) - 1  # torch.tensor([i for i in range(users.shape[0] + 1)]),
        )
        user_embs: Tensor = self.user_embedding(user_features)["users"].values()

        pos_features = KeyedJaggedTensor(
            keys=["items"],
            values=pos.reshape(1, -1),
            offsets=users.new_ones(pos.shape[0] + 1).cumsum(0) - 1 # torch.tensor([i for i in range(pos.shape[0] + 1)]),
        )
        pos_embs: Tensor = self.item_embedding(pos_features)["items"].values()

        neg_features = KeyedJaggedTensor(
            keys=["items"],
            values=neg.reshape(1, -1),
            offsets=users.new_ones(neg.shape[0] + 1).cumsum(0) - 1 # torch.tensor([i for i in range(neg.shape[0] + 1)]),
        )
        neg_embs: Tensor = self.item_embedding(neg_features)["items"].values()
        pos_scores = self.cos(user_embs, pos_embs)
        neg_scores = self.cos(user_embs, neg_embs)
        loss = torch.mean(nn.functional.softplus(cosine_loss_multiplier * (neg_scores - pos_scores)))
        return loss


class CFTorchRecModelTrain(nn.Module):
    """nn.Module to wrap CFTorchRecModel to use with train_pipeline."""
    def __init__(
        self,
        num_users: int,
        num_items: int,
        factors: int,
        cosine_loss_multiplier: float,
    ) -> None:
        super().__init__()
        self.model = CFTorchRecModel(num_users, num_items, factors)
        # self.model = BprMF(num_users, num_items, factors)
        self.cosine_loss_multiplier = cosine_loss_multiplier

    def forward(
        self, batch: Batch
    ) -> Tuple[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]:
        """forward, compute loss"""
        loss = self.model.bpr_loss(batch.users, batch.pos_items, batch.neg_items, self.cosine_loss_multiplier)

        return loss, (loss.detach(), batch.users.detach())

main.py

    train_model = CFTorchRecModelTrain(num_users, num_items, args.factors, args.cosine_loss_multiplier)

    # sharders and planner
    sharding_types = [
        ShardingType.DATA_PARALLEL.value,
        # ShardingType.TABLE_WISE.value,
        ShardingType.ROW_WISE.value,
    ]
    constraints = {
            "t_users": ParameterConstraints(sharding_types=sharding_types),
            "t_items": ParameterConstraints(sharding_types=sharding_types),
        }

    sharders = [EmbeddingCollectionSharder()]
    pg = dist.GroupMember.WORLD
    assert pg is not None, "Process group is not initialized"
    env = ShardingEnv.from_process_group(pg)
    planner = EmbeddingShardingPlanner(
        topology=Topology(
            world_size=env.world_size, compute_device=device.type,
        ),
        constraints=constraints,
    )
    if pg is not None:
        plan = planner.collective_plan(train_model, sharders, pg)
    else:
        plan = planner.plan(train_model, sharders)

    model = DistributedModelParallel(
        module=train_model,
        sharders=sharders,
        plan=plan,
        env=env,
        device=device,
    )
    print("model: ", model)
    print("model.plan: ", model.plan)
    # print("model.named_children(): ", dict(model.named_children()))
    print("dict: model.named_parameters(): ", dict(model.named_parameters()))
    optimizer = KeyedOptimizerWrapper(
        dict(model.named_parameters()),
        lambda params: torch.optim.SGD(params, lr=args.learning_rate),
    )

FBGEMM_gpu issue from both pip install (stable and nightly) and install from source

Hi! I'm trying to install Torchrec and is currently having issue with the FBGEMM dependency. I tried pip install both stable and nightly build following the instruction at https://github.com/pytorch/torchrec#installations , both give me error
OSError: /home/user/anaconda3/envs/torchrec/lib/python3.10/site-packages/fbgemm_gpu/fbgemm_gpu_py.so: undefined symbol: _ZNR5torch7Library4_defEON3c1014FunctionSchemaEPNS1_12OperatorNameE.

I also tried building from source, and it gave error

Installed /home/ec2-user/anaconda3/envs/torchrec/lib/python3.10/site-packages/torchrec-0.2.0-py3.10.egg Processing dependencies for torchrec==0.2.0 Searching for fbgemm-gpu Reading https://pypi.org/simple/fbgemm-gpu/ No local packages or working download links found for fbgemm-gpu error: Could not find suitable distribution for Requirement.parse('fbgemm-gpu')
when python setup.py install develop

Any help/suggestion on how to fix this? Thank you so much in advance!

[Documentation] `EmbeddingBagCollectionConfig` is in the documentation, but it does not actually exist

state_dict loop test fails on single gpu DLRM model

Hi,

I am trying to create a single gpu version of the DLRM model like in this example: https://github.com/pytorch/torchrec/blob/main/examples/inference/dlrm_predict_single_gpu.py

When I extract the state_dict from the model and try to load it back into the model I get a key mismatch.
This simple example reproduces the problem:

from dataclasses import dataclass
from typing import List

import fbgemm_gpu  # nopycln: import
import torch
from torchrec.datasets.criteo import DEFAULT_CAT_NAMES, DEFAULT_INT_NAMES
from torchrec.inference.modules import quantize_embeddings
from torchrec.models.dlrm import DLRM
from torchrec.modules.embedding_configs import EmbeddingBagConfig
from torchrec.modules.embedding_modules import EmbeddingBagCollection


def create_default_model_config():
    @dataclass
    class DLRMModelConfig:
        dense_arch_layer_sizes: List[int]
        dense_in_features: int
        embedding_dim: int
        id_list_features_keys: List[str]
        num_embeddings_per_feature: List[int]
        over_arch_layer_sizes: List[int]
    return DLRMModelConfig(
        dense_arch_layer_sizes=[32, 16, 8],
        dense_in_features=len(DEFAULT_INT_NAMES),
        embedding_dim=8,
        id_list_features_keys=DEFAULT_CAT_NAMES,
        num_embeddings_per_feature=len(DEFAULT_CAT_NAMES)
        * [
            3,
        ],
        over_arch_layer_sizes=[32, 32, 16, 1],
    )


class DLRMFactory(type):
    def __new__(cls, model_config=None):

        # If we do not provide a model config we use the default one compatible with the Criteo dataset
        if not model_config:
            model_config = create_default_model_config()

        default_cuda_rank = 0
        device = torch.device("cuda", default_cuda_rank)
        torch.cuda.set_device(device)

        eb_configs = [
            EmbeddingBagConfig(
                name=f"t_{feature_name}",
                embedding_dim=model_config.embedding_dim,
                num_embeddings=model_config.num_embeddings_per_feature[feature_idx],
                feature_names=[feature_name],
            )
            for feature_idx, feature_name in enumerate(
                model_config.id_list_features_keys
            )
        ]
        # Creates an EmbeddingBagCollection without allocating any memory
        ebc = EmbeddingBagCollection(tables=eb_configs, device=device)

        module = DLRM(
            embedding_bag_collection=ebc,
            dense_in_features=model_config.dense_in_features,
            dense_arch_layer_sizes=model_config.dense_arch_layer_sizes,
            over_arch_layer_sizes=model_config.over_arch_layer_sizes,
            dense_device=device,
        )

        module = quantize_embeddings(module, dtype=torch.qint8, inplace=True)

        return module


if __name__ == "__main__":

    model = DLRMFactory()

    state_dict = model.state_dict()

    model.load_state_dict(state_dict)

Expected result: That the state_dict get loaded.
Encountered error:

Traceback (most recent call last):
  File "/home/ubuntu/serve/examples/torchrec_dlrm/dlrm_factory_repro.py", line 84, in <module>
    model.load_state_dict(state_dict)
  File "/home/ubuntu/miniconda3/envs/dlrm_test/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1604, in load_state_dict
    raise RuntimeError('Error(s) in loading state_dict for {}:\n\t{}'.format(
RuntimeError: Error(s) in loading state_dict for DLRM:
        Missing key(s) in state_dict: "sparse_arch.embedding_bag_collection.embedding_bags.0.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.0.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.0.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.0.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.0.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.0.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.0.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.0.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.0.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.0.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.1.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.1.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.1.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.1.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.1.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.1.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.1.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.1.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.1.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.1.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.2.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.2.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.2.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.2.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.2.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.2.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.2.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.2.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.2.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.2.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.3.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.3.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.3.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.3.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.3.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.3.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.3.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.3.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.3.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.3.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.4.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.4.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.4.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.4.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.4.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.4.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.4.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.4.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.4.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.4.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.5.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.5.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.5.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.5.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.5.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.5.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.5.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.5.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.5.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.5.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.6.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.6.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.6.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.6.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.6.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.6.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.6.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.6.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.6.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.6.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.7.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.7.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.7.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.7.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.7.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.7.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.7.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.7.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.7.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.7.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.8.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.8.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.8.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.8.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.8.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.8.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.8.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.8.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.8.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.8.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.9.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.9.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.9.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.9.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.9.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.9.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.9.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.9.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.9.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.9.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.10.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.10.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.10.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.10.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.10.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.10.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.10.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.10.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.10.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.10.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.11.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.11.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.11.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.11.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.11.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.11.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.11.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.11.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.11.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.11.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.12.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.12.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.12.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.12.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.12.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.12.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.12.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.12.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.12.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.12.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.13.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.13.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.13.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.13.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.13.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.13.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.13.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.13.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.13.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.13.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.14.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.14.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.14.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.14.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.14.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.14.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.14.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.14.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.14.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.14.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.15.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.15.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.15.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.15.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.15.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.15.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.15.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.15.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.15.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.15.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.16.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.16.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.16.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.16.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.16.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.16.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.16.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.16.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.16.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.16.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.17.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.17.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.17.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.17.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.17.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.17.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.17.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.17.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.17.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.17.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.18.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.18.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.18.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.18.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.18.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.18.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.18.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.18.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.18.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.18.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.19.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.19.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.19.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.19.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.19.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.19.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.19.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.19.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.19.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.19.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.20.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.20.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.20.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.20.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.20.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.20.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.20.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.20.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.20.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.20.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.21.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.21.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.21.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.21.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.21.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.21.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.21.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.21.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.21.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.21.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.22.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.22.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.22.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.22.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.22.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.22.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.22.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.22.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.22.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.22.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.23.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.23.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.23.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.23.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.23.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.23.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.23.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.23.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.23.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.23.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.24.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.24.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.24.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.24.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.24.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.24.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.24.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.24.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.24.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.24.lxu_cache_weights", "sparse_arch.embedding_bag_collection.embedding_bags.25.D_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.25.rows_per_table", "sparse_arch.embedding_bag_collection.embedding_bags.25.bounds_check_warning", "sparse_arch.embedding_bag_collection.embedding_bags.25.weights_tys", "sparse_arch.embedding_bag_collection.embedding_bags.25.index_remappings_array_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.25.index_remappings_array", "sparse_arch.embedding_bag_collection.embedding_bags.25.index_remapping_hash_table_offsets", "sparse_arch.embedding_bag_collection.embedding_bags.25.index_remapping_hash_table", "sparse_arch.embedding_bag_collection.embedding_bags.25.table_wise_cache_miss", "sparse_arch.embedding_bag_collection.embedding_bags.25.lxu_cache_weights".
        Unexpected key(s) in state_dict: "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_0.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_1.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_2.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_3.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_4.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_5.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_6.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_7.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_8.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_9.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_10.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_11.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_12.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_13.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_14.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_15.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_16.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_17.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_18.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_19.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_20.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_21.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_22.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_23.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_24.weight", "sparse_arch.embedding_bag_collection.embedding_bags.t_cat_25.weight".

Am I expecting the wrong behavior?

[Installation] Why typing package is needed as requirement?

Torchrec requires Python >= 3.7.
And per typing pypi release note, it is a backport of the standard library typing module to Python versions older than 3.5.

TorchRec installs typing explicitly on Python 3.7.13 (my env), from https://github.com/pytorch/torchrec/blob/main/requirements.txt#L38

This conflicts with follow-up installation of TorchX:

[root@~/test #]pip3 install git+https://github.com/pytorch/torchx.git
Collecting git+https://github.com/pytorch/torchx.git
  Cloning https://github.com/pytorch/torchx.git to /tmp/pip-req-build-3_y9qhan
  Installing build dependencies ... error
  ERROR: Command errored out with exit status 1:
   command: /usr/bin/python3.7 /usr/lib/python3.7/site-packages/pip install --ignore-installed --no-user --prefix /tmp/pip-build-env-z8szfdlm/overlay --no-warn-script-location --no-binary :none: --only-binary :none: -i https://pypi.org/simple -- 'setuptools>=40.8.0' wheel
       cwd: None
  Complete output (42 lines):
  Traceback (most recent call last):
    File "/usr/lib/python3.7/runpy.py", line 193, in _run_module_as_main
      "__main__", mod_spec)
    File "/usr/lib/python3.7/runpy.py", line 85, in _run_code
      exec(code, run_globals)
    File "/usr/lib/python3.7/site-packages/pip/__main__.py", line 26, in <module>
      sys.exit(_main())
    File "/usr/lib/python3.7/site-packages/pip/_internal/cli/main.py", line 73, in main
      command = create_command(cmd_name, isolated=("--isolated" in cmd_args))
    File "/usr/lib/python3.7/site-packages/pip/_internal/commands/__init__.py", line 104, in create_command
      module = importlib.import_module(module_path)
    File "/usr/lib/python3.7/importlib/__init__.py", line 127, in import_module
      return _bootstrap._gcd_import(name[level:], package, level)
    File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
    File "<frozen importlib._bootstrap>", line 983, in _find_and_load
    File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
    File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
    File "<frozen importlib._bootstrap_external>", line 728, in exec_module
    File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
    File "/usr/lib/python3.7/site-packages/pip/_internal/commands/install.py", line 17, in <module>
      from pip._internal.cli.req_command import RequirementCommand, with_cleanup
    File "/usr/lib/python3.7/site-packages/pip/_internal/cli/req_command.py", line 16, in <module>
      from pip._internal.index.collector import LinkCollector
    File "/usr/lib/python3.7/site-packages/pip/_internal/index/collector.py", line 14, in <module>
      from pip._vendor import html5lib, requests
    File "/usr/lib/python3.7/site-packages/pip/_vendor/requests/__init__.py", line 125, in <module>
      from . import utils
    File "/usr/lib/python3.7/site-packages/pip/_vendor/requests/utils.py", line 25, in <module>
      from . import certs
    File "/usr/lib/python3.7/site-packages/pip/_vendor/requests/certs.py", line 15, in <module>
      from pip._vendor.certifi import where
    File "/usr/lib/python3.7/site-packages/pip/_vendor/certifi/__init__.py", line 1, in <module>
      from .core import contents, where
    File "/usr/lib/python3.7/site-packages/pip/_vendor/certifi/core.py", line 12, in <module>
      from importlib.resources import path as get_path, read_text
    File "/usr/lib/python3.7/importlib/resources.py", line 11, in <module>
      from typing import Iterable, Iterator, Optional, Set, Union   # noqa: F401
    File "/usr/lib/python3.7/site-packages/typing.py", line 1359, in <module>
      class Callable(extra=collections_abc.Callable, metaclass=CallableMeta):
    File "/usr/lib/python3.7/site-packages/typing.py", line 1007, in __new__
      self._abc_registry = extra._abc_registry
  AttributeError: type object 'Callable' has no attribute '_abc_registry'

I have to remove typing with pip uninstall to continue to install TorchX.

Can we just remove typing since Python 3.7 + is required?

torchrec inference build error

Hi
I am trying to build torchrec/torchrec/inference , I get this error of "/usr/bin/ld: final link failed: Bad value" with libboost_filesystem.a(operations.o)

/usr/bin/ld: /usr/lib/x86_64-linux-gnu/libboost_filesystem.a(operations.o): relocation R_X86_64_PC32 against symbol '_ZN5boost10filesystem16filesystem_errorC1ERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNS0_4pathESC_NS_6system10error_codeE' can not be used when making a shared object; recompile with -fPIC
/usr/bin/ld: final link failed: Bad value
collect2: error: ld returned 1 exit status

I got similar error with folly library too , which after building folly(folly built with gcc 9+, g++ 9+) with -fPIC got fixed. I install libboost using this command line
./build/fbcode_builder/getdeps.py install-system-deps --recursive

docker image : pytorch/pytorch:1.11.0-cuda11.3-cudnn8-devel
machine : gpu : nvidia tesla v100 ,

Could you kindly help me with debugging this error ?

Thank you
Bhavya

How to correctly run example/dlrm with cpu in multiple nodes?

Hi to all! I tried to run example/dlrm in multiple nodes after fixing permute_pooled_embs_auto_grad problem with #160 . But it was stuck at

torchrec.distributed.model_parallel.DistributedModelParallel.__init__ โ†’
init_data_parallel โ†’
self._data_parallel_wrapper.wrap (which is in DefaultDataParallelWrapper) โ†’
DistributedDataParallel.__init__ โ†’
dist._verify_params_across_proccesses

without any progress (cpus are waiting and no traffic in network).

Could someone help me to see what causes this? Thanks in advance! @liangluofb

My torchrec commit version is b72ed51545d0682547e903f4a921f8d3082ef888, and torch version is 1.12.0.dev20220312.
The torchrec and used fbgemm(see #160 ) are all built from source.

How to reproduce?
I used two nodes(node0 with IP as 10.0.0.1, and node1 with IP as 10.0.0.2). Then I run the below command:

#At node0
torchrun --nnodes 2 --nproc_per_node 1 --node_rank 0 --master_addr=10.0.0.1 --master_port=1234 dlrm_main.py --pin_memory --batch_size 8192 --epochs 1 --num_embeddings_per_feature "22760543,39060,17295,7424,20265,3,7122,1543,63,13022467,3067956,405282,10,2209,11938,155,4,976,14,29277564,40790948,18718810,590152,12973,108,36" --embedding_dim 128 --dense_arch_layer_sizes "512,256,128" --over_arch_layer_sizes "1024,1024,512,256,1" --learning_rate 5.0

#At node1
torchrun --nnodes 2 --nproc_per_node 1 --node_rank 1 --master_addr=10.0.0.1 --master_port=1234 dlrm_main.py --pin_memory --batch_size 8192 --epochs 1 --num_embeddings_per_feature "22760543,39060,17295,7424,20265,3,7122,1543,63,13022467,3067956,405282,10,2209,11938,155,4,976,14,29277564,40790948,18718810,590152,12973,108,36" --embedding_dim 128 --dense_arch_layer_sizes "512,256,128" --over_arch_layer_sizes "1024,1024,512,256,1" --learning_rate 5.0

Query: single gpu support for very large embedding table

Hi there,
I am wondering if torchrec provides any support or features for enabling recsys with embedding table size beyond a single GPU memory (not doing multi-GPU but using host-memory). I looked at the documents but could not find any material on this aspect.

If this is not supported, what is the plan for supporting this feature?

How bad is it to work on CPU for torchrec?

I am a student working in a project making a MVP that will need to use a recommender system. Unfortunately I personally do not have a Nvidia GPU and im running on my Thinkpad T495s.

How bad is it to work on CPU? I image I will need to use smaller datasets.

If not I may try out the free trails by Google or AWS

edit: My Computer specs:

image

Can't install torchrec-nightly on fresh conda environment

Hello,

I'm unable to install torchrec-nightly on conda on a brand new environment. I reproduced the installation guide on the documentation, i.e., ran the following commands:

conda install pytorch cudatoolkit=11.3 -c pytorch-nightly
pip install torchrec-nightly

However, when I import torchrec, I get the following error:

(torchrec) jupyter@ctr-model-gpu:~$ python
Python 3.8.13 | packaged by conda-forge | (default, Mar 25 2022, 06:04:18) 
[GCC 10.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import torchrec
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/conda/envs/torchrec/lib/python3.8/site-packages/torchrec/__init__.py", line 8, in <module>
    import torchrec.distributed  # noqa
  File "/opt/conda/envs/torchrec/lib/python3.8/site-packages/torchrec/distributed/__init__.py", line 39, in <module>
    from torchrec.distributed.model_parallel import DistributedModelParallel  # noqa
  File "/opt/conda/envs/torchrec/lib/python3.8/site-packages/torchrec/distributed/model_parallel.py", line 20, in <module>
    from torchrec.distributed.embeddingbag import (
  File "/opt/conda/envs/torchrec/lib/python3.8/site-packages/torchrec/distributed/embeddingbag.py", line 27, in <module>
    from torchrec.distributed.embedding_sharding import (
  File "/opt/conda/envs/torchrec/lib/python3.8/site-packages/torchrec/distributed/embedding_sharding.py", line 14, in <module>
    from torchrec.distributed.dist_data import (
  File "/opt/conda/envs/torchrec/lib/python3.8/site-packages/torchrec/distributed/dist_data.py", line 15, in <module>
    from torchrec.distributed.comm_ops import (
  File "/opt/conda/envs/torchrec/lib/python3.8/site-packages/torchrec/distributed/comm_ops.py", line 27, in <module>
    import fbgemm_gpu  # @manual # noqa
  File "/opt/conda/envs/torchrec/lib/python3.8/site-packages/fbgemm_gpu/__init__.py", line 12, in <module>
    torch.ops.load_library(os.path.join(os.path.dirname(__file__), "fbgemm_gpu_py.so"))
  File "/opt/conda/envs/torchrec/lib/python3.8/site-packages/torch/_ops.py", line 255, in load_library
    ctypes.CDLL(path)
  File "/opt/conda/envs/torchrec/lib/python3.8/ctypes/__init__.py", line 373, in __init__
    self._handle = _dlopen(self._name, mode)
OSError: /opt/conda/envs/torchrec/lib/python3.8/site-packages/fbgemm_gpu/fbgemm_gpu_py.so: undefined symbol: _ZN3c1021getCustomClassTypeMapEv

Any idea on what might be happening?

use horovod error

when I use horovod to run deepFM, it error
File "/home/maer/zhipeng.li/project/torch_rec_demo/torchrec_ctr/run_horovod_deepfm.py", line 451, in
main(args)
File "/home/maer/zhipeng.li/project/torch_rec_demo/torchrec_ctr/run_horovod_deepfm.py", line 380, in main
model = init_model(device=device)
File "/home/maer/zhipeng.li/project/torch_rec_demo/torchrec_ctr/run_horovod_deepfm.py", line 187, in init_model
dmp_model = DistributedModelParallel(
File "/root/conda/lib/python3.9/site-packages/torchrec/distributed/model_parallel.py", line 211, in init
assert pg is not None, "Process group is not initialized"

the code

import argparse
import os
from distutils.version import LooseVersion


import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data.distributed
from filelock import FileLock
from torchvision import datasets, transforms
import torchrec
import horovod
import horovod.torch as hvd

from torchrec import EmbeddingBagCollection
from torchrec.datasets.utils import Batch
from torchrec.distributed import TrainPipelineSparseDist
from torchrec.distributed.embeddingbag import EmbeddingBagCollectionSharder
from torchrec.distributed.model_parallel import DistributedModelParallel
from torchrec.distributed.types import ModuleSharder
from torchrec.models.dlrm import DLRM, DLRMV2, DLRMTrain
from torchrec.models.deepfm import DeepFM,SimpleDeepFMNN
from torchrec.modules.embedding_configs import EmbeddingBagConfig
from torchrec.optim.keyed import CombinedOptimizer, KeyedOptimizerWrapper

from typing import cast, Iterator, List, Optional, Tuple


# Training settings
def parse_args(argv: List[str]) -> argparse.Namespace:
    parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                        help='input batch size for training (default: 64)')
    parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                        help='input batch size for testing (default: 1000)')
    parser.add_argument('--epochs', type=int, default=10, metavar='N',
                        help='number of epochs to train (default: 10)')
    parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
                        help='learning rate (default: 0.01)')
    parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
                        help='SGD momentum (default: 0.5)')
    parser.add_argument('--no-cuda', action='store_true', default=False,
                        help='disables CUDA training')
    parser.add_argument('--seed', type=int, default=42, metavar='S',
                        help='random seed (default: 42)')
    parser.add_argument('--log-interval', type=int, default=10, metavar='N',
                        help='how many batches to wait before logging training status')
    parser.add_argument('--fp16-allreduce', action='store_true', default=False,
                        help='use fp16 compression during allreduce')
    parser.add_argument('--use-mixed-precision', action='store_true', default=False,
                        help='use mixed precision for training')
    parser.add_argument('--use-adasum', action='store_true', default=False,
                        help='use adasum algorithm to do reduction')
    parser.add_argument('--gradient-predivide-factor', type=float, default=1.0,
                        help='apply gradient predivide factor in optimizer (default: 1.0)')
    parser.add_argument('--data-dir',
                        help='location of the training dataset in the local filesystem (will be downloaded if needed)')
    # DMP config 
    parser.add_argument(
        "--num_embeddings",
        type=int,
        default=100_000,
        help="max_ind_size. The number of embeddings in each embedding table. Defaults"
        " to 100_000 if num_embeddings_per_feature is not supplied.",
    )
    parser.add_argument(
        "--num_embeddings_per_feature",
        type=str,
        default=None,
        help="Comma separated max_ind_size per sparse feature. The number of embeddings"
        " in each embedding table. 26 values are expected for the Criteo dataset.",
    )
    parser.add_argument(
        "--embedding_dim",
        type=int,
        default=64,
        help="Size of each embedding.",
    )
    
    # Arguments when not run through horovodrun
    parser.add_argument('--num-proc', type=int)
    parser.add_argument('--hosts', help='hosts to run on in notation: hostname:slots[,host2:slots[,...]]')
    parser.add_argument('--communication', help='collaborative communication to use: gloo, mpi')
    return parser.parse_args(argv)


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)
    
    
    
class CTRModelTrain(nn.Module):
    """
        nn.Module to wrap DLRM model to use with train_pipeline.
        dlrm_model = DLRMTrain(dlrm_module)
    """

    def __init__(
        self,
        ctr_module: SimpleDeepFMNN,
    ) -> None:
        super().__init__()
        self.model = ctr_module
        self.loss_fn: nn.Module = nn.BCEWithLogitsLoss()

    def forward(
        self, batch: Batch
    ) -> Tuple[torch.Tensor, Tuple[torch.Tensor, torch.Tensor, torch.Tensor]]:
        """
        Args:
            batch: batch used with criteo and random data from torchrec.datasets
        Returns:
            Tuple[loss, Tuple[loss, logits, labels]]
        """
        logits = self.model(batch.dense_features, batch.sparse_features)
        logits = logits.squeeze()
        loss = self.loss_fn(logits, batch.labels.float())

        return loss, (loss.detach(), logits.detach(), batch.labels.detach())


def FunctionName(args,backend):
    from data.ctr_dataloader import get_dataloader, STAGES
    # TODO add CriteoIterDataPipe support and add random_dataloader arg
    train_dataloader = get_dataloader(args, backend, "train")
    val_dataloader = get_dataloader(args, backend, "val")
    test_dataloader = get_dataloader(args, backend, "test")

def init_model(device=torch.device("cpu")):
    print("start init the DMP for embedding")
    
    # 1.the embedding for EmbeddingBag
    from pyre_extensions import none_throws
    eb_configs = [
        EmbeddingBagConfig(
            name=f"t_{feature_name}",
            embedding_dim=args.embedding_dim,
            num_embeddings=none_throws(args.num_embeddings_per_feature)[feature_idx]
            if args.num_embeddings is None
            else args.num_embeddings,
            feature_names=[feature_name],
        )
        for feature_idx, feature_name in enumerate(torchrec.datasets.criteo.DEFAULT_CAT_NAMES)
    ]

    ebc =  EmbeddingBagCollection(
        tables=eb_configs,
        device=torch.device("meta") 
    )
    # 2.build the model  
    deepfm_model = SimpleDeepFMNN(num_dense_features=len(torchrec.datasets.criteo.DEFAULT_INT_NAMES), 
                                 embedding_bag_collection=ebc, hidden_layer_size=1024, deep_fm_dimension=16 )
    train_model = CTRModelTrain(deepfm_model)
    
    #print the model for train
    print("the train model struct:",train_model)
    
    # 3.config and build dmp model
    from fbgemm_gpu.split_embedding_configs import EmbOptimType as OptimType
    fused_params = {
        "learning_rate": args.lr,
        "optimizer": OptimType.EXACT_ROWWISE_ADAGRAD
        # if args.adagrad
        # else OptimType.EXACT_SGD,
    }
    sharders = [
        EmbeddingBagCollectionSharder(fused_params=fused_params),
    ]
    
    dmp_model = DistributedModelParallel(
        module=train_model,
        init_data_parallel=False,
        device=device,
        sharders=cast(List[ModuleSharder[nn.Module]], sharders),
    )
    
    return dmp_model
 
#the  dmp  combine  optimizer
def optimizer_with_params(model:DistributedModelParallel ):
    def optimizer_by_params():
        if args.adagrad:
            return lambda params: torch.optim.Adagrad(params, lr=args.lr)
        else:
            return lambda params: torch.optim.SGD(params, lr=args.lr)

    dense_optimizer = KeyedOptimizerWrapper(
        dict(model.named_parameters()),
        optimizer_with_params(),
    )
    optimizer = CombinedOptimizer([model.fused_optimizer, dense_optimizer])
    return optimizer


def main(args):
    print("start main ....")
    def train_mixed_precision(epoch, scaler):
        model.train()
        # Horovod: set epoch to sampler for shuffling.
        train_sampler.set_epoch(epoch)
        for batch_idx, (data, target) in enumerate(train_loader):
            if args.cuda:
                data, target = data.cuda(), target.cuda()
            optimizer.zero_grad()
            with torch.cuda.amp.autocast():
                output = model(data)
                loss = F.nll_loss(output, target)

            scaler.scale(loss).backward()
            # Make sure all async allreduces are done
            optimizer.synchronize()
            # In-place unscaling of all gradients before weights update
            scaler.unscale_(optimizer)
            with optimizer.skip_synchronize():
                scaler.step(optimizer)
            # Update scaler in case of overflow/underflow
            scaler.update()

            if batch_idx % args.log_interval == 0:
                # Horovod: use train_sampler to determine the number of examples in
                # this worker's partition.
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\tLoss Scale: {}'.format(
                    epoch, batch_idx * len(data), len(train_sampler),
                           100. * batch_idx / len(train_loader), loss.item(), scaler.get_scale()))

    def train_epoch(epoch):
        model.train()
        # Horovod: set epoch to sampler for shuffling.
        train_sampler.set_epoch(epoch)
        for batch_idx, (data, target) in enumerate(train_loader):
            if args.cuda:
                data, target = data.cuda(), target.cuda()
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            if batch_idx % args.log_interval == 0:
                # Horovod: use train_sampler to determine the number of examples in
                # this worker's partition.
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch, batch_idx * len(data), len(train_sampler),
                           100. * batch_idx / len(train_loader), loss.item()))

    def metric_average(val, name):
        tensor = torch.tensor(val)
        avg_tensor = hvd.allreduce(tensor, name=name)
        return avg_tensor.item()

    def test():
        model.eval()
        test_loss = 0.
        test_accuracy = 0.
        for data, target in test_loader:
            if args.cuda:
                data, target = data.cuda(), target.cuda()
            output = model(data)
            # sum up batch loss
            test_loss += F.nll_loss(output, target, size_average=False).item()
            # get the index of the max log-probability
            pred = output.data.max(1, keepdim=True)[1]
            test_accuracy += pred.eq(target.data.view_as(pred)).cpu().float().sum()

        # Horovod: use test_sampler to determine the number of examples in
        # this worker's partition.
        test_loss /= len(test_sampler)
        test_accuracy /= len(test_sampler)

        # Horovod: average metric values across workers.
        test_loss = metric_average(test_loss, 'avg_loss')
        test_accuracy = metric_average(test_accuracy, 'avg_accuracy')

        # Horovod: print output only on first rank.
        if hvd.rank() == 0:
            print('\nTest set: Average loss: {:.4f}, Accuracy: {:.2f}%\n'.format(
                test_loss, 100. * test_accuracy))

    # Horovod: initialize library.
    hvd.init()
    torch.manual_seed(args.seed)
    print("the args.num_proc:",str(args.num_proc)," rank:"+str(hvd.rank())," size:"+str(hvd.size())+" !!!")
    if args.cuda:
        # Horovod: pin GPU to local rank.
        torch.cuda.set_device(hvd.local_rank())
        torch.cuda.manual_seed(args.seed)
    else:
        if args.use_mixed_precision:
            raise ValueError("Mixed precision is only supported with cuda enabled.")

    if (args.use_mixed_precision and LooseVersion(torch.__version__)
            < LooseVersion('1.6.0')):
        raise ValueError("""Mixed precision is using torch.cuda.amp.autocast(),
                            which requires torch >= 1.6.0""")

    # Horovod: limit # of CPU threads to be used per worker.
    torch.set_num_threads(1)

    kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {}
    # When supported, use 'forkserver' to spawn dataloader workers instead of 'fork' to prevent
    # issues with Infiniband implementations that are not fork-safe
    if (kwargs.get('num_workers', 0) > 0 and hasattr(mp, '_supports_context') and
            mp._supports_context and 'forkserver' in mp.get_all_start_methods()):
        kwargs['multiprocessing_context'] = 'forkserver'

    data_dir = args.data_dir or './data'
    with FileLock(os.path.expanduser("~/.horovod_lock")):
        # train_dataset = \
        #     datasets.MNIST(data_dir, train=True, download=True,
        #                    transform=transforms.Compose([
        #                        transforms.ToTensor(),
        #                        transforms.Normalize((0.1307,), (0.3081,))
        #                    ]))
        train_dataset=torchrec.datasets.random.RandomRecDataset(
            keys=torchrec.datasets.criteo.DEFAULT_CAT_NAMES,
            batch_size=args.batch_size,
            hash_size=args.num_embeddings,
            hash_sizes=args.num_embeddings_per_feature
            if hasattr(args, "num_embeddings_per_feature")
            else None,
            manual_seed=args.seed if hasattr(args, "seed") else None,
            ids_per_feature=1,
            num_dense=len(torchrec.datasets.criteo.DEFAULT_INT_NAMES),
            num_generated_batches=1000,
        ),

    # Horovod: use DistributedSampler to partition the training data.
    train_sampler = torch.utils.data.distributed.DistributedSampler(
        train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
    # train_loader = torch.utils.data.DataLoader(
    #     train_dataset, batch_size=args.batch_size, sampler=train_sampler, **kwargs)
    train_loader = torch.utils.data.DataLoader(
        train_dataset,batch_size=args.batch_size, sampler=train_sampler, **kwargs
        
    )

    # test_dataset = \
    #     datasets.MNIST(data_dir, train=False, transform=transforms.Compose([
    #         transforms.ToTensor(),
    #         transforms.Normalize((0.1307,), (0.3081,))
    #     ]))
    test_dataset=torchrec.datasets.random.RandomRecDataset(
            keys=torchrec.datasets.criteo.DEFAULT_CAT_NAMES,
            batch_size=args.batch_size,
            hash_size=args.num_embeddings,
            hash_sizes=args.num_embeddings_per_feature
            if hasattr(args, "num_embeddings_per_feature")
            else None,
            manual_seed=args.seed if hasattr(args, "seed") else None,
            ids_per_feature=1,
            num_dense=len(torchrec.datasets.criteo.DEFAULT_INT_NAMES),
        ),
    # Horovod: use DistributedSampler to partition the test data.
    test_sampler = torch.utils.data.distributed.DistributedSampler(
        test_dataset, num_replicas=hvd.size(), rank=hvd.rank())
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=args.test_batch_size,
                                              sampler=test_sampler, **kwargs)
    
    if args.cuda and torch.cuda.is_available():
        device: torch.device = torch.device(f"cuda:{hvd.rank()}")
    else:
        device: torch.device = torch.device("cpu")
    #model = Net()
    model = init_model(device=device)
    
    #print the plan for DMP model
    print("print the plan for dmp module:")
    print(model.plan)
    

    # By default, Adasum doesn't need scaling up learning rate.
    lr_scaler = hvd.size() if not args.use_adasum else 1

    if args.cuda:
        # Move model to GPU.
        model.cuda()
        # If using GPU Adasum allreduce, scale learning rate by local_size.
        if args.use_adasum and hvd.nccl_built():
            lr_scaler = hvd.local_size()

    # Horovod: scale learning rate by lr_scaler.
    # optimizer = optim.SGD(model.parameters(), lr=args.lr * lr_scaler,
    #                       momentum=args.momentum)
    optimizer = optim.Adagrad(model.parameters(), lr=args.lr * lr_scaler)
    

    dense_optimizer = KeyedOptimizerWrapper(
        dict(model.named_parameters()),
        optimizer,
    )
    combine_optimizer = CombinedOptimizer([model.fused_optimizer, dense_optimizer])

    # Horovod: broadcast parameters & optimizer state.
    hvd.broadcast_parameters(model.state_dict(), root_rank=0)
    hvd.broadcast_optimizer_state(combine_optimizer, root_rank=0)

    # Horovod: (optional) compression algorithm.
    compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none

    # Horovod: wrap optimizer with DistributedOptimizer.
    optimizer = hvd.DistributedOptimizer(combine_optimizer,
                                         named_parameters=model.named_parameters(),
                                         compression=compression,
                                         op=hvd.Adasum if args.use_adasum else hvd.Average,
                                         gradient_predivide_factor=args.gradient_predivide_factor)

    if args.use_mixed_precision:
        # Initialize scaler in global scale
        scaler = torch.cuda.amp.GradScaler()

    for epoch in range(1, args.epochs + 1):
        if args.use_mixed_precision:
            train_mixed_precision(epoch, scaler)
        else:
            train_epoch(epoch)
        # Keep test in full precision since computation is relatively light.
        test()


if __name__ == '__main__':
    import sys
    args = parse_args(sys.argv[1:])
    args.cuda = not args.no_cuda and torch.cuda.is_available()
    if args.num_proc:
        # run training through horovod.run
        print('Running training through horovod.run')
        horovod.run(main,
                    args=(args,),
                    np=args.num_proc,
                    hosts=args.hosts,
                    use_gloo=args.communication == 'gloo',
                    use_mpi=args.communication == 'mpi')
    else:
        # this is running via horovodrun
        main(args)

Unified Authoring of Overlapped Optimizers, and per parameter optimizer settings

Context and Problem:

Optimizer overlap/fusion is a technique where upon calling backwards(), the optimizer step upon each parameter is applied as soon as its gradient is calculated. The benefits of this are twofold:

  • Model training efficiency: We donโ€™t need to move gradients and relook them up, they can be applied as soon as we get it.
  • Memory efficiency: gradients can be released through the computation graph, so they donโ€™t need to all be held in memory simultaneously for opt.step()

These two properties are critical for a distributed, production setting. As such, it has taken different forms across many different parallelization domains such TorchRec, DDP, FSDP

A new use case we now have is to compose these different parallelization strategies, both utilizing, in spirit, some kind of optimizer overlap (implementation may be different, e.g. TorchRec uses fused cuda kernels, but DDP/FSDP may explicitly do it, but overlap gradient bucket reductions with previous bucket parameter updates). However, one problem that we face is that there are differences in how model authors express this functionality across different domains

We propose a unified way of registering optimizers over model parameters() across distributed settings, as well as keeping behavioral consistency between the single-host and parallelized variants.

The main expected behavior that after backwards() is called the parameter is updated and its gradient is set to None (and released from memory).

For TorchRec

In TorchRec's case, we currently have a limitation on our expressibility of which parameters can have which optimizers. In a normal unsharded world you could do

opt1 = SGD(ebc.embedding_bags["table_0"].parameters())
opt2 = Adam(ebc.embedding_bags["table_1"].parameters())

But ShardedEmbeddingBagCollection, or even FusedEmbeddingBagCollection only allow you to specify one optimizer on a per module basis.

Instead, we will now read in which parameters should use which optimizer settings via the metadata attached via attach_overlapped_optimizer. The implementation details will be that in addition to grouping on sharding_type/pooling_type/data_type/etc, we will also group on "fused_params" to create separate lookup kernels for different parameter groups (if they have different optimizers).

Some future implications of this are that we will be able to get rid of fused_params as an input to EBCSharder, and fully get rid of compute_kernel.

Proposed pseudocode

model = SparseNN(
   sparse_arch=EmbeddingBagCollection,
   over_arch=Linear(10,10)
)
 
apply_overlapped_optimizer(torch.optim.SGD, model.sparse_arch["table_0"].parameters(), lr=.02)
apply_overlapped_optimizer(torch.optim.Adam, model.sparse_arch["table_1"].parameters(), lr=.04)

# chose this to be similar to torch.optim.SGD(model.sparse_arch.parameters(), lr)
 
apply_overlapped_optimizer(torch.optim.Adagrad, model.over_arch.parameters(), lr=.006)
 
model.sparse_arch = shard_embedding_modules(model.sparse_arch, [EmbeddingBagCollectionSharder()])
model.over_arch = DistributedDataParallel(model.over_arch)
 
optimizers = get_optimizers(model)
torch.save(optimizer.state_dict())


def apply_overlapped_optimizer(
    optimizer_class: Type[torch.optim.Optimizer],
    params: Iterator[nn.Parameter],
    optimizer_kwargs: Dict[str, Any],
) -> List[torch.optim.Optimizer]:
 
   ...
   for param in params:
       # attach optimizer property/metadata
	 # this is param level metadata that FSDP/DDP/DMP can use to achieve behavioral consistency
       param._optimizer = optimizer_class
       param._optimizer_kwargs = optimizer_kwargs

      param._overlapped_optimizer = optimizer_class([param], **optimizer_kwargs)
       ...
 
       def optimizer_hook(*_unused):
           param._overlapped_optimizer.step()
           param.grad = None
       # this hook can be thrown away on the sharded/parallelized module
       param._acc_grad.register_hook(optimizer_hook)

Note that even in an unsharded world, you could get memory benefits from early releasing gradients.

Checkpointing / Serialization best practices

Are there best practices around checkpointing (and more generally, serializing) torchrec models? I'm thinking in particular about an EmbeddingBagCollection that is sharded across multiple devices.

libgcc_s.so.1 must be installed for pthread_cancel to work

I was trying to update torchrec to 0.2.0 so that I can separate fbgemm gpu and cpu (for testing)

I tried with torch==1.12.0 (released on June 28th?) and 1.13.0.dev20220628

and both give me this pretty opaque error, any advice?

libgcc_s.so.1 must be installed for pthread_cancel to work
[_base._invoke_callbacks:330 - ERROR] exception calling callback for <Future at 0x7f6aeea47460 state=finished raised BrokenProcessPool>
Traceback (most recent call last):
  File "/opt/ee/python/3.8/lib/python3.8/concurrent/futures/_base.py", line 328, in _invoke_callbacks
    callback(self)
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/large_embeddings/common/config_utils.py", line 36, in _done_callback
    future.result()
  File "/opt/ee/python/3.8/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/opt/ee/python/3.8/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
libgcc_s.so.1 must be installed for pthread_cancel to work
Fatal Python error: Aborted

Thread 0x00007f6209ff9700 (most recent call first):
  File "/opt/ee/python/3.8/lib/python3.8/socket.py", line 292 in accept
  File "/opt/ee/python/3.8/lib/python3.8/multiprocessing/connection.py", line 609 in accept
  File "/opt/ee/python/3.8/lib/python3.8/multiprocessing/connection.py", line 463 in accept
  File "/opt/ee/python/3.8/lib/python3.8/multiprocessing/resource_sharer.py", line 142 in _serve
  File "/opt/ee/python/3.8/lib/python3.8/threading.py", line 870 in run
  File "/opt/ee/python/3.8/lib/python3.8/threading.py", line 932 in _bootstrap_inner
  File "/opt/ee/python/3.8/lib/python3.8/threading.py", line 890 in _bootstrap

Thread 0x00007f620b7fc700 (most recent call first):
<no Python frame>

Thread 0x00007f620bffd700 (most recent call first):
<no Python frame>

Thread 0x00007f6220ff8700 (most recent call first):
<no Python frame>

Thread 0x00007f62217f9700 (most recent call first):
<no Python frame>

Thread 0x00007f6777fff700 (most recent call first):
  File "/opt/ee/python/3.8/lib/python3.8/concurrent/futures/thread.py", line 78 in _worker
  File "/opt/ee/python/3.8/lib/python3.8/threading.py", line 870 in run
  File "/opt/ee/python/3.8/lib/python3.8/threading.py", line 932 in _bootstrap_inner
  File "/opt/ee/python/3.8/lib/python3.8/threading.py", line 890 in _bootstrap

Thread 0x00007f677493f700 (most recent call first):
  File "/opt/ee/python/3.8/lib/python3.8/selectors.py", line 468 in select
  File "/opt/ee/python/3.8/lib/python3.8/asyncio/base_events.py", line 1823 in _run_once
  File "/opt/ee/python/3.8/lib/python3.8/asyncio/base_events.py", line 570 in run_forever
  File "/opt/ee/python/3.8/lib/python3.8/threading.py", line 870 in run
  File "/opt/ee/python/3.8/lib/python3.8/threading.py", line 932 in _bootstrap_inner
  File "/opt/ee/python/3.8/lib/python3.8/threading.py", line 890 in _bootstrap

Thread 0x00007f6c29c13740 (most recent call first):
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/tensorflow/python/ops/gen_dataset_ops.py", line 2840 in iterator_get_next
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/tensorflow/python/data/ops/iterator_ops.py", line 783 in _next_internal
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/tensorflow/python/data/ops/iterator_ops.py", line 800 in __next__
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/twitter/ml/torch/experimental/datasets/dataset_lib.py", line 105 in _inner
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/torch/utils/data/_utils/fetch.py", line 39 in fetch
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 701 in _next_data
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 663 in __next__
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/torchrec/distributed/train_pipeline.py", line 490 in progress
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/large_embeddings/custom_training_loop.py", line 24 in step_fn
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/large_embeddings/custom_training_loop.py", line 123 in train_and_evaluate
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/large_embeddings/projects/recap/run_native.py", line 115 in train
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/twitter/ml/torch/experimental/distributed/training.py", line 39 in maybe_run_training
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/large_embeddings/projects/recap/run_native.py", line 140 in main
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/absl/app.py", line 251 in _run_main
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/absl/app.py", line 300 in run
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/large_embeddings/projects/recap/run_native.py", line 148 in <module>
/opt/ee/python/3.8/lib/python3.8/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 15 leaked semaphore objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '
[api._close:697 - WARNING] Sending process 79 closing signal SIGTERM
[api._close:697 - WARNING] Sending process 80 closing signal SIGTERM
[api._close:697 - WARNING] Sending process 81 closing signal SIGTERM
[api._poll:671 - ERROR] failed (exitcode: -6) local_rank: 0 (pid: 78) of binary: /opt/ee/python/3.8/bin/python
[dynamic_rendezvous._close:1128 - INFO] The node 'pytorch-fav-baseline-chief-0_1_0' has closed the rendezvous '79ab6ac2-e83e-4955-87ec-916c3085b103'.
Traceback (most recent call last):
  File "/opt/ee/python/3.8/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/opt/ee/python/3.8/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/large_embeddings/projects/recap/run_native.py", line 148, in <module>
    app.run(main)
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/absl/app.py", line 300, in run
    _run_main(main, args)
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/absl/app.py", line 251, in _run_main
    sys.exit(main(argv))
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/large_embeddings/projects/recap/run_native.py", line 140, in main
    maybe_run_training(
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/twitter/ml/torch/experimental/distributed/training.py", line 66, in maybe_run_training
    torch.distributed.run.main(cmd)
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/torch/distributed/elastic/multiprocessing/errors/__init__.py", line 345, in wrapper
    return f(*args, **kwargs)
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/torch/distributed/run.py", line 761, in main
    run(args)
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/torch/distributed/run.py", line 752, in run
    elastic_launch(
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/torch/distributed/launcher/api.py", line 131, in __call__
    return launch_agent(self._config, self._entrypoint, list(args))
  File "/opt/ee/python/3.8/lib/python3.8/site-packages/torch/distributed/launcher/api.py", line 245, in launch_agent
    raise ChildFailedError(
torch.distributed.elastic.multiprocessing.errors.ChildFailedError:
============================================================
/opt/ee/python/3.8/lib/python3.8/site-packages/large_embeddings/projects/recap/run_native.py FAILED
------------------------------------------------------------
Failures:
  <NO_OTHER_FAILURES>
------------------------------------------------------------
Root Cause (first observed failure):
[0]:
  time      : 2022-07-11_08:55:06
  host      : pytorch-fav-baseline-chief-0
  rank      : 0 (local_rank: 0)
  exitcode  : -6 (pid: 78)
  error_file: <N/A>
  traceback : Signal 6 (SIGABRT) received by PID 78
============================================================
libgcc_s.so.1 must be installed for pthread_cancel to work
Fatal Python error: Aborted

Thread 0x00007f3f96992740 (most recent call first):
<no Python frame>
Fatal Python error: Segmentation fault

Thread 0x00007f3f96992740 (most recent call first):
<no Python frame>

How does DistributedModelParallel work?

First off, this is an amazing library! Lots of very cool features, but I'm very curious about DistributedModelParallel.

What exactly does it do/how does it work? I couldn't really find much information in the docstrings/docs. How does it work with data parallel?

AttributeError: '_OpNamespace' object has no attribute 'new_managed_tensor'

Hi team,

I am seeing AttributeError: '_OpNamespace' object has no attribute 'new_managed_tensor' error when running one of the examples (examples/sharding/uvm.ipynb). It's generated in

uvm_model = torchrec.distributed.DistributedModelParallel(
    ebc,
    device=torch.device("cuda"),
    plan=plan
)

which goes down to
fbgemm_gpu/split_table_batched_embeddings_ops.py:1164

out=torch.ops.fbgemm.new_managed_tensor(
   1165                     # pyre-fixme[6]: Expected `Optional[Type[torch._dtype]]`
   1166                     #  for 3rd param but got `Type[Type[torch._dtype]]`.
   1167                     torch.zeros(1, device=self.current_device, dtype=dtype),
   1168                     [split.uvm_size],

One thing that I changed from the example is from compute_kernels=[EmbeddingComputeKernel.BATCHED_FUSED_UVM.value] to compute_kernels=[EmbeddingComputeKernel.FUSED_UVM_CACHING.value] but this is because EmbeddingComputeKernel does not have BATCHED_FUSED_UVM attribute.

I think this might be a pytorch-related issue but I am wondering if you have seen this error before.

My configurations are
python=3.9
pytorch=1.12.1
cuda=11.3
torchrec=0.2.0
fbgemm_gpu_nightly

Thanks in advance!

NCCL error while instantiating DistributedModelParallel

Hello,

I'm trying to train a TorchRec model in a single node with two Nvidia A100 GPUs.

(torchrec) jupyter@ctr-model-gpu-a100-2:~/hft/ctr-model$ nvidia-smi
Fri May 13 15:04:20 2022       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 470.57.02    Driver Version: 470.57.02    CUDA Version: 11.4     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|===============================+======================+======================|
|   0  NVIDIA A100-SXM...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   36C    P0    58W / 400W |      0MiB / 40536MiB |      0%      Default |
|                               |                      |             Disabled |
+-------------------------------+----------------------+----------------------+
|   1  NVIDIA A100-SXM...  Off  | 00000000:00:05.0 Off |                    0 |
| N/A   34C    P0    57W / 400W |      0MiB / 40536MiB |      0%      Default |
|                               |                      |             Disabled |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Processes:                                                                  |
|  GPU   GI   CI        PID   Type   Process name                  GPU Memory |
|        ID   ID                                                   Usage      |
|=============================================================================|
|  No running processes found                                                 |
+-----------------------------------------------------------------------------+
(torchrec) jupyter@ctr-model-gpu:~$ python -c "import torch; print(torch.version.cuda)"
11.3

I installed TorchRec and FBGEMM from source. My TorchRec version:

(torchrec) jupyter@ctr-model-gpu:~/hft/ctr-model$ pip freeze | grep torchrec
torchrec==0.1.0

Below's the script I'm trying to run. I've replaced the model with a very simple EBC to discard issues on my model's architecture.

import json
import os

import torch
import torchrec
import torch.distributed as dist
import torch.multiprocessing as mp
from sklearn.metrics import roc_auc_score
from torch.utils.data import DataLoader
from torchrec.distributed import DistributedModelParallel as DMP
from tqdm import tqdm

import datasets
import models
from config_parser import ConfigParser


def main(rank: int, config: ConfigParser) -> None:
    # initalize process group
    dist.init_process_group(                                   
    	backend="nccl",                                         
   		init_method="env://",
    	world_size=2,
    	rank=rank
    )    
    # model
    model = torchrec.EmbeddingBagCollection(
        device=torch.device("meta"),
        tables=[
            torchrec.EmbeddingBagConfig(
                name="product_table",
                embedding_dim=64,
                num_embeddings=4096,
                feature_names=["product"],
                pooling=torchrec.PoolingType.SUM,
            )
        ]
    )
    dmp_model = DMP(
        # module=config.init_object("model", models).to(torch.device("meta")),
        module=model,
        device=torch.device(f"cuda:{rank}")
    )

if __name__ == "__main__":
    # instantiate config parser
    config = ConfigParser()
    
    # distributed config
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29500"
    mp.spawn(main, nprocs=2, args=(config,))

When trying to run this code, I get the following error. I used export NCCL_DEBUG=INFO to get NCCL's logs.

Traceback (most recent call last):
  File "main_dist.py", line 165, in <module>
    mp.spawn(main, nprocs=2, args=(config,))
  File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 240, in spawn
    return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
  File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 198, in start_processes
    while not context.join():
  File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 160, in join
    raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException: 

-- Process 0 terminated with the following error:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrap
    fn(i, *args)
  File "/home/jupyter/hft/ctr-model/main_dist.py", line 29, in main
    module=model,
UnboundLocalError: local variable 'model' referenced before assignment

(base) jupyter@ctr-model-gpu:~/hft/ctr-model$ python main_dist.py 
Log directory 'runs/test-torchrec-dist' already exists. Overwrite? [y / n] y
ctr-model-gpu:30732:30732 [0] NCCL INFO Bootstrap : Using ens6:10.138.0.12<0>
ctr-model-gpu:30732:30732 [0] NCCL INFO NET/Plugin: Failed to find ncclCollNetPlugin_v4 symbol.
ctr-model-gpu:30732:30732 [0] NCCL INFO NET/FastSocket : Tx CPU start: -2
ctr-model-gpu:30732:30732 [0] NCCL INFO NET/FastSocket : Rx CPU start: -2
ctr-model-gpu:30732:30732 [0] NCCL INFO NET/FastSocket : Flow placement enabled.
ctr-model-gpu:30732:30732 [0] NCCL INFO NET/FastSocket : queue skip: 0
ctr-model-gpu:30732:30732 [0] NCCL INFO NET/FastSocket : Using [0]ens6:10.138.0.12<0>
ctr-model-gpu:30732:30732 [0] NCCL INFO NET/FastSocket plugin initialized
ctr-model-gpu:30732:30732 [0] NCCL INFO Using network FastSocket
NCCL version 2.10.3+cuda11.3
ctr-model-gpu:30733:30733 [0] NCCL INFO Bootstrap : Using ens6:10.138.0.12<0>
ctr-model-gpu:30733:30733 [0] NCCL INFO NET/Plugin: Failed to find ncclCollNetPlugin_v4 symbol.
ctr-model-gpu:30733:30733 [0] NCCL INFO NET/FastSocket : Tx CPU start: -2
ctr-model-gpu:30733:30733 [0] NCCL INFO NET/FastSocket : Rx CPU start: -2
ctr-model-gpu:30733:30733 [0] NCCL INFO NET/FastSocket : Flow placement enabled.
ctr-model-gpu:30733:30733 [0] NCCL INFO NET/FastSocket : queue skip: 0
ctr-model-gpu:30733:30733 [0] NCCL INFO NET/FastSocket : Using [0]ens6:10.138.0.12<0>
ctr-model-gpu:30733:30733 [0] NCCL INFO NET/FastSocket plugin initialized
ctr-model-gpu:30733:30733 [0] NCCL INFO Using network FastSocket


ctr-model-gpu:30733:30847 [0] init.cc:521 NCCL WARN Duplicate GPU detected : rank 1 and rank 0 both on CUDA device 40
ctr-model-gpu:30732:30846 [0] init.cc:521 NCCL WARN Duplicate GPU detected : rank 0 and rank 1 both on CUDA device 40
ctr-model-gpu:30733:30847 [0] NCCL INFO init.cc:904 -> 5
ctr-model-gpu:30732:30846 [0] NCCL INFO init.cc:904 -> 5
ctr-model-gpu:30733:30847 [0] NCCL INFO group.cc:72 -> 5 [Async thread]
ctr-model-gpu:30732:30846 [0] NCCL INFO group.cc:72 -> 5 [Async thread]
Traceback (most recent call last):
  File "main_dist.py", line 166, in <module>
    mp.spawn(main, nprocs=2, args=(config,))
  File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 240, in spawn
    return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
  File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 198, in start_processes
    while not context.join():
  File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 160, in join
    raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException: 

-- Process 0 terminated with the following error:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrap
    fn(i, *args)
  File "/home/jupyter/hft/ctr-model/main_dist.py", line 42, in main
    device=torch.device(f"cuda:{rank}")
  File "/opt/conda/lib/python3.7/site-packages/torchrec/distributed/model_parallel.py", line 187, in __init__
    plan = planner.collective_plan(module, sharders, pg)
  File "/opt/conda/lib/python3.7/site-packages/torchrec/distributed/planner/planners.py", line 188, in collective_plan
    sharders,
  File "/opt/conda/lib/python3.7/site-packages/torchrec/distributed/collective_utils.py", line 60, in invoke_on_rank_and_broadcast_result
    dist.broadcast_object_list(object_list, rank, group=pg)
  File "/opt/conda/lib/python3.7/site-packages/torch/distributed/distributed_c10d.py", line 1869, in broadcast_object_list
    broadcast(object_sizes_tensor, src=src, group=group)
  File "/opt/conda/lib/python3.7/site-packages/torch/distributed/distributed_c10d.py", line 1187, in broadcast
    work = default_pg.broadcast([tensor], opts)
RuntimeError: NCCL error in: /opt/conda/conda-bld/pytorch_1646755953518/work/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp:1169, invalid usage, NCCL version 21.0.3
ncclInvalidUsage: This usually reflects invalid usage of NCCL library (such as too many async ops, too many collectives at once, mixing streams in a group, etc).

Reading the NCCL logs, I noticed this two lines:

ctr-model-gpu:30733:30847 [0] init.cc:521 NCCL WARN Duplicate GPU detected : rank 1 and rank 0 both on CUDA device 40
ctr-model-gpu:30732:30846 [0] init.cc:521 NCCL WARN Duplicate GPU detected : rank 0 and rank 1 both on CUDA device 40

Could this be the cause of the issue? If so, how do I solve it? I ran another regular PyTorch script with DistributedDataParallel and didn't have any issues with NCCL; the script ran fine.

`torchrec.inference.modules.quantize_embeddings` does not seem to quantize weights.

I tried to quantize my EBC in 3 different ways. None of them changed the dtype of the weights. Perhaps I'm misunderstanding when quantization happens?

import torch
import torch.quantization as quant
import math
import torchrec
from torchrec.modules.embedding_configs import EmbeddingBagConfig
from torchrec.modules.embedding_modules import EmbeddingBagCollection

eb_configs = [
    EmbeddingBagConfig(
        name=f"t_{feature_name}",
        embedding_dim=256,
        num_embeddings=1024,
        feature_names=[feature_name],
    )
    for feature_idx, feature_name in enumerate(
        ['feature0', 'feature1',]
    )
]
ebc = EmbeddingBagCollection(tables=eb_configs, device='cuda')

from torchrec.sparse.jagged_tensor import KeyedJaggedTensor
features = KeyedJaggedTensor(
  keys=["feature0", "feature1"],
  values=torch.arange(1024).cuda(),
  lengths=torch.ones(1024).int().cuda(),
)

ebc(features).values()
print({key: value.dtype for key, value in ebc.state_dict().items()})

qebc = quant.quantize_dynamic(ebc, inplace=False)
qebc(features).values()
print({key: value.dtype for key, value in qebc.state_dict().items()})

from torchrec.inference.modules import quantize_embeddings
tr_qebc = quantize_embeddings(ebc, dtype=torch.qint8, inplace=False)
print({key: value.dtype for key, value in tr_qebc.state_dict().items()})

All the prints here output

{'embedding_bags.t_feature0.weight': torch.float32, 'embedding_bags.t_feature1.weight': torch.float32}

PyTorch Lightning example

Dear TorchRec Team,

Thanks a lot for the awesome library.

I wondered if an example with PyTorch Lightning could be put together to demonstrate how easy it is to get started with both librairies.

Best,
T.C

OSError: libtorch_cuda_cpp.so: cannot open shared object file: No such file or directory

----------- solved ---------

Getting pytorch-1.12.0.dev20220420 as of today. This version works.


Original issue:

import torch; print(torch.__version__) gives 1.11.0+cu102

I wonder if the cuda version is incorrect. However
conda install pytorch cudatoolkit=11.3 -c pytorch-nightly
either gives me pytorch version 1.10.0 (with CUDA 11.3), or pytorch version 1.11.0 (with CUDA 10.2 I suppose?). For pytorch==1.10.0 there's another error complaining ModuleNotFoundError: No module named 'torch.distributed.fsdp'. So I guess we have to use pytorch 1.11.0.

Detailed error messages:

>>> import torchrec
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/ubuntu/anaconda3/envs/pytorch_p38/lib/python3.8/site-packages/torchrec/__init__.py", line 8, in <module>
    import torchrec.distributed  # noqa
  File "/home/ubuntu/anaconda3/envs/pytorch_p38/lib/python3.8/site-packages/torchrec/distributed/__init__.py", line 39, in <module>
    from torchrec.distributed.model_parallel import DistributedModelParallel  # noqa
  File "/home/ubuntu/anaconda3/envs/pytorch_p38/lib/python3.8/site-packages/torchrec/distributed/model_parallel.py", line 20, in <module>
    from torchrec.distributed.embeddingbag import (
  File "/home/ubuntu/anaconda3/envs/pytorch_p38/lib/python3.8/site-packages/torchrec/distributed/embeddingbag.py", line 27, in <module>
    from torchrec.distributed.embedding_sharding import (
  File "/home/ubuntu/anaconda3/envs/pytorch_p38/lib/python3.8/site-packages/torchrec/distributed/embedding_sharding.py", line 14, in <module>
    from torchrec.distributed.dist_data import (
  File "/home/ubuntu/anaconda3/envs/pytorch_p38/lib/python3.8/site-packages/torchrec/distributed/dist_data.py", line 15, in <module>
    from torchrec.distributed.comm_ops import (
  File "/home/ubuntu/anaconda3/envs/pytorch_p38/lib/python3.8/site-packages/torchrec/distributed/comm_ops.py", line 27, in <module>
    import fbgemm_gpu  # @manual # noqa
  File "/home/ubuntu/anaconda3/envs/pytorch_p38/lib/python3.8/site-packages/fbgemm_gpu/__init__.py", line 12, in <module>
    torch.ops.load_library(os.path.join(os.path.dirname(__file__), "fbgemm_gpu_py.so"))
  File "/home/ubuntu/anaconda3/envs/pytorch_p38/lib/python3.8/site-packages/torch/_ops.py", line 220, in load_library
    ctypes.CDLL(path)
  File "/home/ubuntu/anaconda3/envs/pytorch_p38/lib/python3.8/ctypes/__init__.py", line 373, in __init__
    self._handle = _dlopen(self._name, mode)
OSError: libtorch_cuda_cpp.so: cannot open shared object file: No such file or directory

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.