Git Product home page Git Product logo

streaming's Introduction


Fast, accurate streaming of training data from cloud storage

PyPi Version PyPi Package Version Unit test PyPi Downloads Documentation Chat @ Slack License


👋 Welcome

We built StreamingDataset to make training on large datasets from cloud storage as fast, cheap, and scalable as possible.

It’s specially designed for multi-node, distributed training for large models—maximizing correctness guarantees, performance, and ease of use. Now, you can efficiently train anywhere, independent of your training data location. Just stream in the data you need, when you need it. To learn more about why we built StreamingDataset, read our announcement blog.

StreamingDataset is compatible with any data type, including images, text, video, and multimodal data.

With support for major cloud storage providers (AWS, OCI, GCS, Azure, Databricks, and any S3 compatible object store such as Cloudflare R2, Coreweave, Backblaze b2, etc. ) and designed as a drop-in replacement for your PyTorch IterableDataset class, StreamingDataset seamlessly integrates into your existing training workflows.

The flow of samples from shards in the cloud to devices in your cluster

🚀 Getting Started

💾 Installation

Streaming can be installed with pip:

pip install mosaicml-streaming

🏁 Quick Start

1. Prepare Your Data

Convert your raw dataset into one of our supported streaming formats:

  • MDS (Mosaic Data Shard) format which can encode and decode any Python object
  • CSV / TSV
  • JSONL
import numpy as np
from PIL import Image
from streaming import MDSWriter

# Local or remote directory in which to store the compressed output files
data_dir = 'path-to-dataset'

# A dictionary mapping input fields to their data types
columns = {
    'image': 'jpeg',
    'class': 'int'
}

# Shard compression, if any
compression = 'zstd'

# Save the samples as shards using MDSWriter
with MDSWriter(out=data_dir, columns=columns, compression=compression) as out:
    for i in range(10000):
        sample = {
            'image': Image.fromarray(np.random.randint(0, 256, (32, 32, 3), np.uint8)),
            'class': np.random.randint(10),
        }
        out.write(sample)

2. Upload Your Data to Cloud Storage

Upload your streaming dataset to the cloud storage of your choice (AWS, OCI, or GCP). Below is one example of uploading a directory to an S3 bucket using the AWS CLI.

$ aws s3 cp --recursive path-to-dataset s3://my-bucket/path-to-dataset

3. Build a StreamingDataset and DataLoader

from torch.utils.data import DataLoader
from streaming import StreamingDataset

# Remote path where full dataset is persistently stored
remote = 's3://my-bucket/path-to-dataset'

# Local working dir where dataset is cached during operation
local = '/tmp/path-to-dataset'

# Create streaming dataset
dataset = StreamingDataset(local=local, remote=remote, shuffle=True)

# Let's see what is in sample #1337...
sample = dataset[1337]
img = sample['image']
cls = sample['class']

# Create PyTorch DataLoader
dataloader = DataLoader(dataset)

📚 What next?

Getting started guides, examples, API references, and other useful information can be found in our docs.

We have end-to-end tutorials for training a model on:

We also have starter code for the following popular datasets, which can be found in the streaming directory:

Dataset Task Read Write
LAION-400M Text and image Read Write
WebVid Text and video Read Write
C4 Text Read Write
EnWiki Text Read Write
Pile Text Read Write
ADE20K Image segmentation Read Write
CIFAR10 Image classification Read Write
COCO Image classification Read Write
ImageNet Image classification Read Write

To start training on these datasets:

  1. Convert raw data into .mds format using the corresponding script from the convert directory.

For example:

$ python -m streaming.multimodal.convert.webvid --in <CSV file> --out <MDS output directory>
  1. Import dataset class to start training the model.
from streaming.multimodal import StreamingInsideWebVid
dataset = StreamingInsideWebVid(local=local, remote=remote, shuffle=True)

🔑 Key Features


Seamless data mixing

Easily experiment with dataset mixtures with Stream. Dataset sampling can be controlled in relative (proportion) or absolute (repeat or samples terms). During streaming, the different datasets are streamed, shuffled, and mixed seamlessly just-in-time.

# mix C4, github code, and internal datasets
streams = [
  Stream(remote='s3://datasets/c4', proportion=0.4),
  Stream(remote='s3://datasets/github', proportion=0.1),
  Stream(remote='gcs://datasets/my_internal', proportion=0.5),
]

dataset = StreamingDataset(
  streams=streams,
  samples_per_epoch=1e8,
)

True Determinism

A unique feature of our solution: samples are in the same order regardless of the number of GPUs, nodes, or CPU workers. This makes it easier to:

  • Reproduce and debug training runs and loss spikes
  • Load a checkpoint trained on 64 GPUs and debug on 8 GPUs with reproducibility

See the figure below — training a model on 1, 8, 16, 32, or 64 GPUs yields the exact same loss curve (up to the limitations of floating point math!)

Plot of elastic determinism

Instant mid-epoch resumption

It can be expensive — and annoying — to wait for your job to resume while your dataloader spins after a hardware failure or loss spike. Thanks to our deterministic sample ordering, StreamingDataset lets you resume training in seconds, not hours, in the middle of a long training run.

Minimizing resumption latency can save thousands of dollars in egress fees and idle GPU compute time compared to existing solutions.

High throughput

Our MDS format cuts extraneous work to the bone, resulting in ultra-low sample latency and higher throughput compared to alternatives for workloads bottlenecked by the dataloader.

Tool Throughput
StreamingDataset ~19000 img/sec
ImageFolder ~18000 img/sec
WebDataset ~16000 img/sec

Results shown are from ImageNet + ResNet-50 training, collected over 5 repetitions after the data is cached after the first epoch.

Equal convergence

Model convergence from using StreamingDataset is just as good as using local disk, thanks to our shuffling algorithm.

Plot of equal convergence

Below are results from ImageNet + ResNet-50 training, collected over 5 repetitions.

Tool Top-1 Accuracy
StreamingDataset 76.51% +/- 0.09
ImageFolder 76.57% +/- 0.10
WebDataset 76.23% +/- 0.17

StreamingDataset shuffles across all samples assigned to a node, whereas alternative solutions only shuffle samples in a smaller pool (within a single process). Shuffling across a wider pool spreads out adjacent samples more. In addition, our shuffling algorithm minimizes dropped samples. We have found both of these shuffling features advantageous for model convergence.

Random access

Access the data you need when you need it.

Even if a sample isn’t downloaded yet, you can access dataset[i] to get sample i. The download will kick off immediately and the result will be returned when it’s done - similar to a map-style PyTorch dataset with samples numbered sequentially and accessible in any order.

dataset = StreamingDataset(...)
sample = dataset[19543]

No divisibility requirements

StreamingDataset will happily iterate over any number of samples. You do not have to forever delete samples so that the dataset is divisible over a baked-in number of devices. Instead, each epoch a different selection of samples are repeated (none dropped) so that each device processes the same count.

dataset = StreamingDataset(...)
dl = DataLoader(dataset, num_workers=...)

Disk usage limits

Dynamically delete least recently used shards in order to keep disk usage under a specified limit. This is enabled by setting the StreamingDataset argument cache_limit. See the shuffling guide for more details.

dataset = StreamingDataset(
    cache_limit='100gb',
    ...
)

🏆 Project Showcase

Here are some projects and experiments that used StreamingDataset. Got something to add? Email [email protected] or join our Community Slack.

  • BioMedLM: a Domain Specific Large Language Model for BioMedicine by MosaicML and Stanford CRFM
  • Mosaic Diffusion Models: Training Stable Diffusion from Scratch Costs <$160k
  • Mosaic LLMs: GPT-3 quality for <$500k
  • Mosaic ResNet: Blazingly Fast Computer Vision Training with the Mosaic ResNet and Composer
  • Mosaic DeepLabv3: 5x Faster Image Segmentation Training with MosaicML Recipes
  • …more to come! Stay tuned!

💫 Contributors

We welcome any contributions, pull requests, or issues.

To start contributing, see our Contributing page.

P.S.: We're hiring!

If you like this project, give us a star and check out our other projects:

  • Composer - a modern PyTorch library that makes scalable, efficient neural network training easy
  • MosaicML Examples - reference examples for training ML models quickly and to high accuracy - featuring starter code for GPT / Large Language Models, Stable Diffusion, BERT, ResNet-50, and DeepLabV3
  • MosaicML Cloud - our training platform built to minimize training costs for LLMs, Diffusion Models, and other large models - featuring multi-cloud orchestration, effortless multi-node scaling, and under-the-hood optimizations for speeding up training time

✍️ Citation

@misc{mosaicml2022streaming,
    author = {The Mosaic ML Team},
    title = {streaming},
    year = {2022},
    howpublished = {\url{<https://github.com/mosaicml/streaming/>}},
}

streaming's People

Contributors

abhi-mosaic avatar b-chu avatar bandish-shah avatar bigning avatar dakinggg avatar dblalock avatar dependabot[bot] avatar ejyuen avatar hanlint avatar hlky avatar huxuan avatar jaearly avatar karan6181 avatar knighton avatar maddiedawson avatar milocress avatar mvpatel2000 avatar nharada1 avatar nqn avatar orenleung avatar orionw avatar peterding avatar philipnrmn avatar samhavens avatar skylion007 avatar snarayan21 avatar sophiawisdom avatar srowen avatar vanshcsingh avatar xiaohanzhangcmu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

streaming's Issues

There appear to be 2 leaked shared_memory objects to clean up at shutdown

OS: [Ubuntu 20.04]

Hi, I'm running GPT-125M model on some GPUs with some changes in llm-foundry and composer, with FSDP enabled, with loss_fn: torch_crossentropy and attn_impl: torch and also gpu_flops_available: true, the command line looks like:

composer -v --stdout log{rank}.out --stderr log{rank}.err train/train.py   train/yamls/pretrain/mpt-125m.yaml   data_local=my-copy-c4   train_loader.dataset.split=train_small   eval_loader.dataset.split=val_small   max_duration=15ba   eval_interval=0   save_folder=mpt-125m

at the end of the running, I saw the below message:

...
[batch=15/15]:
         Train time/batch: 14
         Train time/sample: 3584
         Train time/batch_in_epoch: 14
         Train time/sample_in_epoch: 3584
         Train time/token: 7340032
         Train time/token_in_epoch: 7340032
         Train trainer/device_train_microbatch_size: 16
         ...
         Train throughput/device/flops_per_sec: x
         Train throughput/device/mfu: x
         ...
2023-07-17 22:43:50,619 - composer.utils.checkpoint - DEBUG - Saving checkpoint to mpt-125m/ep{epoch}-ba{batch}-rank{rank}.pt
Abort was called at 345 line in file:
./shared/source/memory_manager/memory_manager.cpp
2023-07-17 22:43:51,154 - composer.cli.launcher - ERROR - Rank 4 crashed with exit code -6.
2023-07-17 22:43:51,155 - composer.cli.launcher - INFO - Killing global rank 0 (PID 1668044) with SIGTERM
2023-07-17 22:43:51,155 - composer.cli.launcher - INFO - Killing global rank 1 (PID 1668045) with SIGTERM
2023-07-17 22:43:51,155 - composer.cli.launcher - INFO - Killing global rank 2 (PID 1668046) with SIGTERM
2023-07-17 22:43:51,155 - composer.cli.launcher - INFO - Killing global rank 3 (PID 1668047) with SIGTERM
2023-07-17 22:43:51,155 - composer.cli.launcher - INFO - Killing global rank 6 (PID 1668050) with SIGTERM
Waiting up to 30 seconds for all training processes to terminate. Press Ctrl-C to exit immediately.
/home/yjguo/miniconda3/envs/yjguo_mpt/lib/python3.9/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 2 leaked shared_memory objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '
2023-07-17 22:44:21,198 - composer.cli.launcher - WARNING - Failed to kill global rank 1 (PID 1668045) with SIGTERM; terminating with SIGKILL instead
2023-07-17 22:44:21,221 - composer.cli.launcher - WARNING - Failed to kill global rank 2 (PID 1668046) with SIGTERM; terminating with SIGKILL instead
Global rank 0 (PID 1668044) exited with code -6
Global rank 3 (PID 1668047) exited with code -6
----------Begin global rank 3 STDOUT----------
...

It contains that There appear to be 2 leaked shared_memory objects to clean up at shutdown, and as far as I know, shm is used by streaming according to https://github.com/mosaicml/llm-foundry/issues/436#issuecomment-1627712085, so I'm wondering if this issue also relative to streaming, could you share something to try/debug? thanks.

btw, no such error message if FSDP is disabled in the yaml file with all others same.

MDSWriter throws an error when writing to a local directory using Python 3.8

Environment

Python 3.8 (Kubeflow notebook)

  • OS: [Ubuntu 20.04]
  • Hardware (GPU, or instance type): N/A

To reproduce

Steps to reproduce the behavior:

Script (copied from README):
$ cat write_data.py

import numpy as np
from PIL import Image
from streaming import MDSWriter

# Local or remote directory in which to store the compressed output files
data_dir = '/home/jovyan/tmp'

# A dictionary mapping input fields to their data types
columns = {
    'image': 'jpeg',
    'class': 'int'
}

# Shard compression, if any
compression = 'zstd'

# Save the samples as shards using MDSWriter
with MDSWriter(out=data_dir, columns=columns, compression=compression) as out:
    for i in range(10000):
        sample = {
            'image': Image.fromarray(np.random.randint(0, 256, (32, 32, 3), np.uint8)),
            'class': np.random.randint(10),
        }
        out.write(sample)
  1. python3 write_data.py

Expected behavior

I expect the data to be written successful to the local directory ./tmp without any issues. I suspect this is an issue since the feature cancel_futures is only Python 3.9+. See comment on PR.

Additional context

Error trace:

Traceback (most recent call last):
  File "write_data.py", line 24, in <module>
    out.write(sample)
  File "/home/jovyan/.local/lib/python3.8/site-packages/streaming/base/format/base/writer.py", line 265, in __exit__
    self.finish()
  File "/home/jovyan/.local/lib/python3.8/site-packages/streaming/base/format/base/writer.py", line 244, in finish
    self.executor.shutdown(wait=True, cancel_futures=False)
TypeError: shutdown() got an unexpected keyword argument 'cancel_futures'

Questions about `StreamingDataset` in the case of limited (fast) local disk storage

I have started to use the library and have enjoyed it so far! This is not a feature request but more a few questions to understand whether I am using the library correctly for what i need. Dumping these questions here as I believe some others would ask themselves the same question :)

I need to stream data from s3. My understanding is that the class will StreamingDataset will download the shards to the folder local and read from there.

  • I have limited fast disk storage (like nvme disk). What happens if I iterate over all the shard samples? Is the shard nuked automatically? I am not sure I understand the argument keep_raw and more particularly what "epoch" means in this context.
  • In the case shuffle=False, is there a way to pipe the data into the data preprocessing/training without writing that raw data on disk? I believe that's what WebDataset is doing. If the answer is yes, does that preserve the deterministic resuming?

Thanks in advance!

Cannot load a dataset with only compressed shards

** Environment **

  • OS: [Ubuntu 20.04]
  • Hardware (GPU, or instance type): N/A

** To reproduce

Steps to reproduce the behavior:

  1. Write an MDS dataset with zstd compression.
  2. Delete the *.mds files.
  3. Attempt to load the dataset via StreamingDataset.

Expected behavior

The dataset is loaded correctly.

Additional context

Instead of loading, the dataset hangs, suggesting that both the uncompressed and compressed files are required. This would mean I need to upload the entire dataset, both compressed and uncompressed, to S3.

Using Streaming Dataset for Multi Node DDP

Hi,

I am running a multi node job with the complete mosaic stack. I have the entire dataset stored locally(in .MDS Format). The datasets need to be mixed and therefore I have a separate stream for each sub dataset(the path is passed as 'local' parameter with 'remote' set to none) with a proportion value set. When I start the job on one node the entire training loop works perfectly. When I move it a multi node training job I get the following error:

ValueError: Reused local directory: [All the paths to the dataset].

I faced this issue even when restarting a job on one node. When I just changed the folder name it worked(I got this off another issue) - 306 But in a multi node setup this fails. If you can let me know what I am doing wrong or if there is a fix for this , I would Appreciate it.

Thanks,

Srinivasan Nandakumar

cache_limit bottlenecks the performance

Environment

  • OS: [RHEL]
  • Hardware (GPU, or instance type): 8x[A100 40gb]
  • ENV: SLURM single compute node

To reproduce

Steps to reproduce the behavior:

  1. Choose a reasonably large dataset (i.e., OSCAR)
  2. Create a dataloader using StreamingDataset
  3. set appropriate cache_limit (<< the size of the dataset)

Expected behavior

StreamingDataset local cache should be managed in a way that it is always available (for new data) and doesn't affect the data loading performance when downloading from the remote. Currently, once the cache is full, the data loading becomes extremely slow and becomes a bottleneck for the training.

Additional context

image

Does MDS support GeoTiffs?

🚀 Feature Request

If I were to encode a geotiff image with the mds format, when decoded would the spatial info of the image be retained?

Motivation

It can be problematic if the geospatial information is dropped and difficult to reassign this information to each image patch.

Additional context

Perhaps a useful option would be to read/download image data from a large satellite image. Similar to the functionality provided by torchgeo. However, I realise this may not be your target audience. Nevertheless, this functionality would be really beneficial to the geospatial community and the environmental challenges we work on :)

Could you explain the principle behind streaming?

It looks like streaming supporting global index in getitem(), so how could it accelerate data loading for large-scale sharded data? I mean, if the data index is global, the training loader must access each shards randomly. What's the implementation difference compared with webdataset?

Of course, it's an awesome project and I am learning to use it.

Timeout Error when local=None in StreamingDataset and training in distributed mode

While trying to train with the llm-foundry repo on 1 node with 8 gpus, I kept getting a timeout error in all the non-leader processes when using a StreamingDataset I uploaded to s3.

File "/usr/lib/python3/dist-packages/streaming/base/dataset.py", line 350, in __init__
    stream_shards = stream.get_shards(world)
  File "/usr/lib/python3/dist-packages/streaming/base/stream.py", line 387, in get_shards
    wait_for_file_to_exist(filename, TICK, 60,
  File "/usr/lib/python3/dist-packages/streaming/base/util.py", line 52, in wait_for_file_to_exist
    raise RuntimeError(f'{err_msg}, bailing out: ' + f'{timeout:.3f} < {dt:.3f} sec.')
RuntimeError: Index file /tmp/tmpqc4652dc/train/index.json took too long to download, bailing out: 60.000 < 60.003 sec.

In the training config, I don't set the data_local field, so when the StreamingDataset is initialized local=None. I found the tmp file where the index file is supposed to be downloaded and it exists and downloads in a second so it shouldn't be timing out. I went into the code and commented out the RuntimeError above so it would continue running and all the processes would try loading in the index file that definitely existed. This resulted in a new error:

File "/usr/lib/python3/dist-packages/streaming/base/dataset.py", line 350, in __init__
    stream_shards = stream.get_shards(world)
  File "/usr/lib/python3/dist-packages/streaming/base/stream.py", line 392, in get_shards
    obj = json.load(open(filename))
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmp6uouofjr/train/index.json'

So it seems that since each process is setting the local attribute to a different tmp directory and then waits for the index file to be created in that directory, but it never will.

In streaming/base/stream.py:100

self.local = local or mkdtemp()

Support for azure Data Lake Gen2 type storage

🚀 Feature Request

The current support for Azure only covers the blob type storage and not the Data Lake Gen2 type storage which is more generally used for AI workloads. These support hierarchical namespaces in comparison to the flat namespaces in the case of blob storage. Also these type of storage are optimized for Big Data analytics and training on large sets of data.

Motivation

With PR #256, we can use Streaming with azure blob type storage. I tried it out and it worked seamlessly. But when I tried it with Data Lake type storage it crashed, mainly due to differences in the way these two are handled by the sdk.

Implementation

The code additions to allow their usage is minimal:

   # Create a new session per thread
    service = DataLakeServiceClient(
        account_url=f"https://{os.environ['AZURE_ACCOUNT_NAME']}.dfs.core.windows.net",
        credential= os.environ['AZURE_ACCOUNT_ACCESS_KEY'])
    try:
        file_client = service.get_file_client(file_system=obj.netloc, file_path=obj.path.lstrip('/'))
        with open(local, 'wb') as my_blob:
            file_data = file_client.download_file()
            file_data.readinto(my_blob)
    except ResourceNotFoundError:
        raise FileNotFoundError(f'Object {remote} not found.')
    except Exception:
        raise

We need only add a new service client for Data Lakes and change the corresponding get_file_client method. Also a corresponding uploader needs to be created.

The only reason I haven't created a PR yet is that it is not yet clear to me what may be the best way to differentiate between calling either of the clients. Can we have a flag for the storage type ( storage_type = blob or storage_type = datalake), hat can be set by the user? Which other files may need changing because of this? Please suggest and I can contribute.

Request for more ways to divide shards besides size_limit

🚀 Feature Request

Currently, size_limit is the only way to divide MDSWriter's shards.
It would be nice to add a way to set the total number of shards or set the number of items per shard.

Motivation

I want to set the number of shards to be divided by the number of nodes.
If possible, it would be even better if I could set the number of items per shard to be roughly same. (refer img2dataset)

Shared dir selection method prone to collisions in concurrent scenarios

Hi and thanks for your work on the library!

As you point out yourself in dataset.py:315, the shared directory should be unique per StreamingDataset to avoid shared dir collisions. However, the selection mechanism of try-the-same-one-first is prone to collisions under concurrent scenarios. Consider this:

import multiprocessing

def run(i):
    bucket = "blah"
    path = "foo/bar/00000/"
    ds = streaming.StreamingDataset(
        remote=f"s3://{bucket}/{path}",
        local=f"/tmp/streaming/{bucket}/{path}",
        # shuffle_seed=i
    )
    return ds._shared_dir

with multiprocessing.Pool(4) as pool:
    results = pool.map(run, list(range(4)))

It might work out fine, but more often than not, you get cryptic errors as below, presumably because of the time window between checking whether the dir exists and file-locking it.

RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/opt/venv/lib/python3.9/site-packages/streaming/base/shared.py", line 49, in __init__
    self._shm = SharedMemory(shm_path, True, size)
  File "/usr/lib/python3.9/multiprocessing/shared_memory.py", line 103, in __init__
    self._fd = _posixshmem.shm_open(
FileExistsError: [Errno 17] File exists: '/07196c_barrier'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.9/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.9/multiprocessing/pool.py", line 48, in mapstar
    return list(map(*args))
  File "/tmp/ipykernel_47575/4213878970.py", line 6, in run
    ds = streaming.StreamingDataset(
  File "/opt/venv/lib/python3.9/site-packages/streaming/base/dataset.py", line 252, in __init__
    self._worker_barrier = SharedBarrier(worker_barrier_filelock_path, worker_barrier_shm_path,
  File "/opt/venv/lib/python3.9/site-packages/streaming/base/shared.py", line 53, in __init__
    self._shm = SharedMemory(shm_path, False, size)
  File "/usr/lib/python3.9/multiprocessing/shared_memory.py", line 103, in __init__
    self._fd = _posixshmem.shm_open(
FileNotFoundError: [Errno 2] No such file or directory: '/07196c_barrier'

If the # shuffle_seed=i bit is uncommented, it works fine even with 64 parallel executions. While this workaround works fine for now, it took a while to trace down this behavior - wouldn't a better default be to seed deterministically based on the provided data path? If there's strong reasons not to change this, perhaps users should need to set shuffle_seed explicitly by default?

MDS: standard ?

Hi,
Thanks for the lib!

I think the 2 main alternatives in the pytorch world are webdataset and torch data. They both support tar files as shard format.
The benefit of tar is that it's standard and supported by hundred of tools.

You decided to go instead with your own custom format for sharding.
Do you have some pointers as to

  1. why is that format better?
  2. What is that format exactly? Is there a spec?
  3. What are some ways to work with MDS files outside of this streamer library?

I noticed that you used img2dataset for downloading laion400m, great!
Maybe we could add support to WDS file type directly in img2dataset to avoid having to do a (costly) conversion after the fact.

Allow for accessing slices of dataset

🚀 Feature Request

Right now, we can use the StreamingDataset as a standard iterator or for random access. I want to do sliced random accesses.
Essentially, in the README example, I'd like to do a sample = dataset[list(range(1337, 1450))].

from torch.utils.data import DataLoader
from streaming import StreamingDataset

# Remote path where full dataset is persistently stored
remote = 's3://my-bucket/path-to-dataset'

# Local working dir where dataset is cached during operation
local = '/tmp/path-to-dataset'

# Create streaming dataset
dataset = StreamingDataset(local=local, remote=remote, shuffle=True)

# Let's see what is in sample #1337...
sample = dataset[[1337, 1338]] # instead of dataset[1337] and then dataset[1338]
img = sample['image']
cls = sample['class']

# Create PyTorch DataLoader
dataloader = DataLoader(dataset)

Resume support for MDSWriter?

Large datasets might require many hours or days to convert into the MDS format. If the job fails for whatever reason, it would be very nice to be able to restart where the prior job left off. I don't think this is currently supported, but I would love to be wrong about that!

Composable datasets

🚀 Feature Request

As a user, I'd like to easily compose and stream an (MDS, jsonl, etc) dataset from shard and index matched existing (MDS, jsonl, etc) datasets that reside in various buckets/folders

Motivation

We want to train various models (generative, multi-modal) on large datasets (> 10T). One specific scenario is generating various embeddings with different models based on an original dataset.

e.g. dataset_A is an (image, caption) dataset. From dataset_A we generate embeddings from 10 different models and produce embedding_1, embedding_2, ..., embedding_10 that are shard/index matched datasets to dataset_A.

We would like to be able to compose a streaming dataset from any combination of dataset_A and one or more of the embedding datasets. The composed dataset should respect the following,

  • a batch from the composed dataset is shard/index matched between the composition (assuming they match)
  • shuffle and streaming across multi-node/GPU maintains correspondence between composed datasets

We want to avoid creating a single dataset that includes all corresponding embeddings, which depending on the training scenario might cost large egress fees and disk space, while only a subset are used.

[Optional] Implementation

Ideally some form of streams implementation assuming datasets have been created and stored in appropriate format(s),

e.g.

# compose datasets from various existing MDS datasets that are shard/index matching
composable_streams = [
  Stream(remote='s3://datasets/dataset_a'),
  Stream(remote='s3://datasets/embeddings_0'),
  Stream(remote='gcs://datasets/embeddings_3'),
]

dataset = StreamingDataset(
  composable_streams=composable_streams,
)

Additional context

LocalDataset bug

LocalDataset Bug

streaming.base.LocalDataset need override size function

class LocalDataset(Array, Dataset):
   """A streaming dataset whose shards reside locally as a pytorch Dataset.

   Args:
       local (str): Local dataset directory where shards are cached by split.
       split (str, optional): Which dataset split to use, if any. Defaults to ``None``.
   """

   def __init__(self, local: str, split: Optional[str] = None):
       split = split or ''

       self.local = local
       self.split = split

       filename = os.path.join(local, split, get_index_basename())  # pyright: ignore
       obj = json.load(open(filename))
       if obj['version'] != 2:
           raise ValueError('Unsupported version')

       self.shards = []
       for info in obj['shards']:
           shard = reader_from_json(local, split, info)
           self.shards.append(shard)
       self.num_samples = sum([shard.samples for shard in self.shards])

       shard_sizes = np.array([x.samples for x in self.shards])
       self.spanner = Spanner(shard_sizes)

   def __len__(self) -> int:
       """Get the length as an IterableDataset.

       Returns:
           int: Dataset length.
       """
       return self.num_samples
   
   @property
   def size(self) -> int:
       """Get the size of the dataset in samples.

       Returns:
           int: Number of samples.
       """
       return self.length

   def get_item(self, sample_id: int) -> Dict[str, Any]:
       """Get sample by global sample ID.

       Args:
           sample_id (int): Sample ID.

       Returns:
           Dict[str, Any]: Column name with sample data.
       """
       shard_id, index_in_shard = self.spanner[sample_id]
       shard = self.shards[shard_id]
       return shard[index_in_shard]

Integration with Jax?

Hey guys,

Huge fan of the work you guys did with Composer, and was very excited to see this -- IMO fills a much needed whole on the data infra side.

I'm curious if it's recommended / if anyone's played around using this with Flax/JAX? I think the MDS is a nice solution to still having random access to sharded, serialized data (and sharded tfrecords + tf.Data is a pretty common stack). I know it's early days, but would love to know if anyone can warn me of any sharp edges / pitfalls ahead.

On a similar note, I'm not an expert in the PyTorch ecosystem, but as I understood the torch dataloaders have had issues keeping the GPUs fed. It looks like the recommended setup right now is to pass a StreamingDataset to a torch dataloader -- did you guys notice any issues related to the GPU usage?

Thanks!

CUDA initialization error at raw_delete when crossing epoch boundary

Environment

  • OS: [Ubuntu 20.04]
  • Hardware (GPU, or instance type): [A100] >= 2 GPUs

To reproduce

Steps to reproduce the behavior:

When trying to run examples/bert mlm training (using train_small)
composer main.py yamls/main/hf-bert-base-uncased.yaml train_loader.dataset.split=train_small
the config uses group_method: truncate (see here)
It looks like when training hits an epoch boundary, get a RuntimeError: DataLoader worker (pid(s) 95472, 95478) exited unexpectedly

Traceback

terminate called after throwing an instance of 'c10::Error'
  what():  CUDA error: initialization error
CUDA kernel errors might be asynchronously reported at some other API call,so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Exception raised from c10_cuda_check_implementation at ../c10/cuda/CUDAException.cpp:31 (most recent call first):
frame #0: c10::Error::Error(c10::SourceLocation, std::string) + 0x57 (0x7f5a4a115457 in /workdisk/karan/venv_streaming/lib/python3.10/site-packages/torch/lib/libc10.so)
frame #1: c10::detail::torchCheckFail(char const*, char const*, unsigned int, std::string const&) + 0x64 (0x7f5a4a0df3ec in /workdisk/karan/venv_streaming/lib/python3.10/site-packages/torch/lib/libc10.so)
frame #2: c10::cuda::c10_cuda_check_implementation(std::string const&, std::string const&, int, bool) + 0xb4 (0x7f5a76685c64 in /workdisk/karan/venv_streaming/lib/python3.10/site-packages/torch/lib/libc10_cuda.so)
frame #3: <unknown function> + 0x1e1af (0x7f5a7665d1af in /workdisk/karan/venv_streaming/lib/python3.10/site-packages/torch/lib/libc10_cuda.so)
frame #4: c10::cuda::CUDACachingAllocator::raw_delete(void*) + 0x244 (0x7f5a76660054 in /workdisk/karan/venv_streaming/lib/python3.10/site-packages/torch/lib/libc10_cuda.so)
frame #5: <unknown function> + 0x4d7d63 (0x7f5aa20e3d63 in /workdisk/karan/venv_streaming/lib/python3.10/site-packages/torch/lib/libtorch_python.so)
frame #6: c10::TensorImpl::~TensorImpl() + 0x1a0 (0x7f5a4a0f59e0 in /workdisk/karan/venv_streaming/lib/python3.10/site-packages/torch/lib/libc10.so)
frame #7: c10::TensorImpl::~TensorImpl() + 0x9 (0x7f5a4a0f5af9 in /workdisk/karan/venv_streaming/lib/python3.10/site-packages/torch/lib/libc10.so)
frame #8: <unknown function> + 0x735788 (0x7f5aa2341788 in /workdisk/karan/venv_streaming/lib/python3.10/site-packages/torch/lib/libtorch_python.so)
frame #9: THPVariable_subclass_dealloc(_object*) + 0x2a5 (0x7f5aa2341a75 in /workdisk/karan/venv_streaming/lib/python3.10/site-packages/torch/lib/libtorch_python.so)

json.decoder.JSONDecodeError: Index file is empty or corrupted

the following changes(as per PR #330) cause the following error on a single node, multi-gpu setup(4 gpus):

filename = self._download_file(basename)

    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Index file at /tmp/local_path_1/train/index.json is empty or corrupted. Expecting value: line 1 column 1 (char 0)

On re -running the script the issue is fixed. But every time I change the local path, it always gives this error in the first run and fixes itself from the second run. I tried running with the earlier code and the code runs fine every time. What could be the reason? Why was this here in the first place? I mean the downloading and renaming part. @karan6181 @knighton

Originally posted by @shivshandilya in #330 (comment)

StreamingDataset with DDP hangs and then crashes with NCCL timeout error

Environment

AWS Deep Learning Machine with 8xA100 and CUDA 11.8

To reproduce

Steps to reproduce the behavior:

  1. Use StreamingDataset to load ImageNet from a local SSD using DDP.

Expected behavior

The data loads as expected when running on a single GPU. I expect the data to load in the same way on multiple GPUs.

Additional context

I'm using accelerate launch / torchrun to launch 8 processes. I'm loading from a local disk, not a remote file. I do this by passing the same (local) directory to both the local and remote arguments of StreamingDataset. Specifically, I have a dataset that looks like:

class CustomStreamingDataset(StreamingDataset):
    def __init__(
        self, 
        local: str, 
        remote: Optional[str] = None, 
        shuffle: bool = False, 
        batch_size: int = 1, 
        transform: Optional[Callable] = None,
    ):
        remote = local if remote is None else remote
        super().__init__(remote=remote, local=local, shuffle=shuffle, batch_size=batch_size)
        self.transform = transform

    def __getitem__(self, idx):
        item = super().__getitem__(idx)
        feats = item['features'].squeeze(0)
        label = item['class']
        if self.transform is not None:
            feats = self.transform(feats)
        return feats, label

And then I load it as follows:

if args.use_streaming_dataset:
    data_dir = f"{args.feature_path}/imagenet256_streaming"
    dataset = CustomStreamingDataset(data_dir, shuffle=True, batch_size=batch_size)
    load_kwargs = dict()
else:
    features_dir = f"{args.feature_path}/imagenet256_features"
    labels_dir = f"{args.feature_path}/imagenet256_labels"
    dataset = CustomDataset(features_dir, labels_dir)
    load_kwargs = dict(shuffle=True, pin_memory=True, drop_last=True)
loader = DataLoader(
    dataset, batch_size=batch_size, num_workers=args.num_workers, **load_kwargs
)

The code does (not) work under the following settings:

  • 1 GPU, without streaming dataset: works correctly
  • 8 GPUs, without streaming dataset: works correctly
  • 1 GPU, with streaming dataset: works correctly
  • 8 GPUs, with streaming dataset: hangs forever

Eventually the program crashes with the following error:

RuntimeError: NCCL communicator was aborted on rank {RANK}.  Original reason for failure was: [Rank 1] Watchdog caught collective operation timeout: WorkNCCL(SeqNum=15, OpType=BROADCAST, Timeout(ms)=1800000) ran for 1804394 milliseconds before timing out.

where {RANK} is replaced by 0, 1, ... 7 on each process.

Perhaps this is related to #293. However, since it's not exactly the same, I thought I should leave a separate issue.

Parallel writing of MDS files

🚀 Feature Request

Parallel writing of MDS files (or define standard such that anybody can write a tool to write datasets)

Motivation

I'm producing large datasets and currently I use:

    with MDSWriter(out=data_dir, columns=columns, compression=compression) as out:
        with open(args.input,"rb") as inf:
            dctx = zstandard.ZstdDecompressor()
            stream_reader  = dctx.stream_reader(inf)
            text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8')
            for line in text_stream:
                triple_object = json.loads(line)
                out.write(triple_object)
                pbar.update(1)

but this single object writing is very slow. Is it possible to parallelize this somehow?

[Optional] Implementation

Additional context

Question: what shuffle_block_size and cache_limit should I set for StreamingDataset? what size_limit should I set for MDSWriter?

Thanks for this nice library!👏

This is not a feature request but more a few questions to understand: I believe some others would ask themselves the same question :)

  1. what shuffle_block_size and cache_limit should I set for StreamingDataset?
  2. what size_limit should I set for MDSWriter?

more specific

  1. If my dataset size is 10TB and each file size is 10MB, how should I set these parameters: cache_limit, size_limit, shuffle_block_size? If the size of my data set is 10TB, and the size of each file is 10KB, how should I set these parameters: cache_limit, size_limit, shuffle_block_size?
  2. Is there any guidance or best practices available to help me determine the appropriate values for these parameters?
  3. Can I set the cache_limit here to be as small as 1GB, or it must need a large capacity such as 100GB.

Thanks in advance!😀

Training hang at start when running on multi-gpu

Environment

  • OS: [Ubuntu 20.04]
  • Hardware (GPU, or instance type): [A100, H100]

To reproduce

When trying to run the model training on multi-gpu, I observed a hang. Running any multi-gpu model training produces a hang

Snapshot (curtailed output)

eval_dataset.resize_size: 232
model.loss_name: binary_cross_entropy
train_dataset.crop_size: 176

Run evaluation
******************************
Config:
blurpool/num_blurconv_layers: 6
blurpool/num_blurpool_layers: 1
num_gpus_per_node: 8
num_nodes: 1
rank_zero_seed: 17

******************************
<HANGED>

Supporting tracking JIRA issue: CO-1688

DDP with streaming got duplicate data

Writer

import numpy as np
from streaming import MDSWriter

# Directory in which to store the compressed output files
data_dir = 'dirname'

# A dictionary mapping input fields to their data types
columns = {
    'image': 'ndarray',
    'class': 'int'
}

# Shard compression, if any
compression = 'zstd'

# Save the samples as shards using MDSWriter
with MDSWriter(out=data_dir, columns=columns, compression=compression) as out:
    for i in range(8):
        sample = {
            'image': np.random.randint(i+1, i+2, (1, 1)),
            'class': np.random.randint(i+1, i+2),
        }
        print(sample)
        out.write(sample)

reader.py

from torch.utils.data import DataLoader
from streaming import StreamingDataset
import torch.distributed as dist

dist.init_process_group(backend = 'nccl')

# Remote path where full dataset is persistently stored
# remote = 's3://path-to-dataset'
remote = 'dirname'

# Local working dir where dataset is cached during operation
local = 'cache'

# Create streaming dataset
dataset = StreamingDataset(local=local, remote=remote, shuffle=False)

# Let's see what is in sample #1337...
sample = dataset[2]
img = sample['image']
cls = sample['class']

# Create PyTorch DataLoader
dataloader = DataLoader(dataset, batch_size=1)

for idx, i in enumerate(dataloader):
    if dist.get_rank() == 3:
        # import pdb;pdb.set_trace()
        print(f"Rank: {dist.get_rank()}, data: {i}, idx: {idx}")

Launch reader:

export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
python -m torch.distributed.launch --nproc_per_node=4 reader.py

Then, I will get output:

Rank: 3, data: {'class': tensor([1]), 'image': tensor([[[1]]])}, idx: 0
Rank: 3, data: {'class': tensor([1]), 'image': tensor([[[1]]])}, idx: 1
Rank: 3, data: {'class': tensor([2]), 'image': tensor([[[2]]])}, idx: 2
Rank: 3, data: {'class': tensor([2]), 'image': tensor([[[2]]])}, idx: 3
Rank: 3, data: {'class': tensor([3]), 'image': tensor([[[3]]])}, idx: 4
Rank: 3, data: {'class': tensor([3]), 'image': tensor([[[3]]])}, idx: 5
Rank: 3, data: {'class': tensor([4]), 'image': tensor([[[4]]])}, idx: 6
Rank: 3, data: {'class': tensor([4]), 'image': tensor([[[4]]])}, idx: 7
Rank: 3, data: {'class': tensor([5]), 'image': tensor([[[5]]])}, idx: 8
Rank: 3, data: {'class': tensor([5]), 'image': tensor([[[5]]])}, idx: 9
Rank: 3, data: {'class': tensor([6]), 'image': tensor([[[6]]])}, idx: 10
Rank: 3, data: {'class': tensor([6]), 'image': tensor([[[6]]])}, idx: 11
Rank: 3, data: {'class': tensor([7]), 'image': tensor([[[7]]])}, idx: 12
Rank: 3, data: {'class': tensor([7]), 'image': tensor([[[7]]])}, idx: 13
Rank: 3, data: {'class': tensor([8]), 'image': tensor([[[8]]])}, idx: 14

Add support for any S3 compatible object storage

I'm not using AWS for my s3 object storage so it isn't supported by StreamingDataset. However, the only change I need to make is hard coding the endpoint_url to my custom one in streaming/base/storage/download.py:47

# original
s3 = session.client('s3', config=config)
# hard coded endpoint_url
s3 = session.client('s3', config=config, endpoint_url="<custom_endpoint>")

It seems like #255 and #68 address similar issues for gcs and r2, both of which are s3 compatible object stores using boto3. To me it feels like it would makes sense to just have a single download function for any s3 compatible object store that uses boto3 and give the user the ability to specify client parameters like the endpoint_url?

Add 'concat' option in C4 dataset

🚀 Feature Request

Benchmark repo has 2 options for C4 dataset samples: concat and truncate.

For LM training, 'concat' seems to be the only correct way forward. Can the 'concat' option present in the StreamingC4 from the benchmark repo be added to this repo as well?

Motivation

For LM training, using truncate instead of concat makes the training process redundant and the LM fails to learn anything.

image
Screen Shot 2022-09-27 at 12 19 02 AM

Implementation

This works with the benchmark repo, not sure if this can directly be copied over here: https://github.com/mosaicml/benchmarks/blob/main/llm/src/data_c4.py

Why can't I run two experiments in parallel which will load from the same dataset location?

I am trying to run two experiments in parallel (completely independent of each other) on the same server. Both are supposed to load data from the same location, but StreamingTextDataset is crashing with this error message:

Traceback (most recent call last):
  File "llm-foundry/train.py", line 442, in <module>
    main(cfg)
  File "llm-foundry/train.py", line 313, in main
    train_loader = build_dataloader(
  File "llm-foundry/train.py", line 72, in build_dataloader
    return build_text_dataloader(
  File "llm-foundry/llmfoundry/data/text_data.py", line 255, in build_text_dataloader
    dataset = StreamingTextDataset(
  File "llmfoundry/data/text_data.py", line 112, in __init__
    super().__init__(
  File "miniconda3/envs/sml/lib/python3.10/site-packages/streaming/base/dataset.py", line 325, in __init__
    self._shm_prefix, self._locals_shm = get_shm_prefix(my_locals, world)
  File "miniconda3/envs/sml/lib/python3.10/site-packages/streaming/base/shared.py", line 340, in get_shm_prefix
    raise ValueError(f'Reused local directory: {sorted(my_locals_set)} vs ' +
ValueError: Reused local directory: ['/nvmedisk/Datasets/llmfoundry/c4/train_small'] vs ['/nvmedisk/Datasets/llmfoundry/c4/train_small']. Provide a different one.

Both experiments are using this yaml config:

train_loader:
  name: text  
  dataset:
    local: /nvmedisk/Datasets/llmfoundry/c4
    split: train_small
    shuffle: false
    max_seq_len: 2048
    decoder_only_format: true
  drop_last: true
  num_workers: 8
  pin_memory: false
  prefetch_factor: 2
  persistent_workers: false
  timeout: 0

Use the training process to inform the data loading process

Hi team, thanks for your fantastic work on creating all these open-sourced repos! I have been using them for my projects and it has been an amazing experience. I am implementing an idea that tries to dynamically load data from different sources, where I maintain shard information for each dataset and a sampling ratio to decide which dataset to get sample_ids from. The following is the part of the __iter__ function on how to get the next sample:

while True:
  sampling_ratio = self.sampling_ratio
  domain = np.random.choice(range(self.n_domains), 1, p=sampling_ratio)[0].item()
  domain_sample_id = sample_ids[domain][self.domain_id[domain] % domain_lens[domain]]
  self.domain_id[domain] += 1
  yield self[(domain, domain_sample_id)]

I've rewritten __getitem__, load_state_dict(), state_dict() etc. functions to make it compatible. One issue is that I need to change the sampling_ratio periodically based on the evaluation loss, and I have been doing it in a callback function each time after the evaluation is done by directly changing the sampling_ratio through state.train_dataloader.dataset. However, this procedure does not work because each worker of __iter__ does not share memory. I am thinking about using SharedMemory to hold this information and allow the main process to write it and the workers to read it, using a combination of event and lock. Do you think this would work? Is it possible to write the shared memory using a nonworker process and use the worker processes to access it?

For simplicity, I can also only use the main process to load data and it seems to be working now.

Thanks for your help!

Transfer json folder to Streaming

Hi, your scripts from data_prep can convert data from hf to streaming

I want to use my own data from folder which contains many json.zst files

Have we some script which can script folder --out_root my_out_root?

Streaming via ssh across clusters

🚀 Feature Request

I want to use streaming to access a remote datacenter via ssh with certain privacy-related permission.

Motivation

I want to use a separate cluster to store the dataset, and want to set the permissions for different images in the datasets for different users.

[Optional] Implementation

Cloud you provide some potential API or examples which might be helpful to this feature?

Additional context

StreamingDataset with torch.nn.parallel.DistributedDataParallel

I am trying to use StreamingDataset with torch.nn.parallel.DistributedDataParallel by:

nranks = 'WORLD_SIZE' in os.environ and int(os.environ['WORLD_SIZE'])
nranks = max(1, nranks)
args.batch_size = args.batch_size // nranks


local = "/tmp/mds_triples_v1/"
dataset = StreamingDataset(local=local, remote=args.s3_train_path, shuffle=True)

    
train_dataloader = DataLoader(
             dataset,
             batch_size=args.batch_size,
             collate_fn=collate_fn,
             num_workers=10,
 )

.... 

model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.rank],
                                                      output_device=args.rank,
                                                      find_unused_parameters=True)

I am wondering if in this way my GPUs will all see the same data. I used to use torch.utils.data.distributed.DistributedSampler to avoid that in the past, but I am not sure if I can integrate it with StreamingDataset

Shared Memory issue with multiple instances of Streaming Dataset in a multi-gpu setup

Environment

  • OS: [Ubuntu 23.06.30]
  • Hardware (GPU, or instance type): [8xV100]

The issue

I am trying Streaming Dataset with Pytorch Lightning. In the setup section of the code I try to initialize multiple instances of Streaming Dataset for test, train and validation. These datasets then are passed the dataloaders. The problem arises with multiprocessing, when these three datasets are initialized concurrently.

  File "/anaconda/envs/testing/lib/python3.9/site-packages/streaming/base/dataset.py", line 373, in __init__
    self._shm_prefix_int, self._locals_shm = get_shm_prefix(my_locals, world)
  File "/anaconda/envs/testing/lib/python3.9/site-packages/streaming/base/shared/prefix.py", line 173, in get_shm_prefix
    shm = SharedMemory(name, True, len(data))
  File "/anaconda/envs/testing/lib/python3.9/site-packages/streaming/base/shared/memory.py", line 41, in __init__
    shm = BuiltinSharedMemory(name, create, size)
  File "/anaconda/envs/testing/lib/python3.9/multiprocessing/shared_memory.py", line 103, in __init__
    self._fd = _posixshmem.shm_open(
FileExistsError: [Errno 17] File exists: '/000009_locals'

This error seems to arise when workers try to create a Shared Memory that has already been created for a previous dataset. The same prefix_int is assigned again while shm creation. When only one dataset is initialized the code works as expected on multiple gpus.
Am I missing something here? If not, then how can I go about creating multiple streaming datasets?

Memory leak when using `StreamingDataset`'s `__iter__` method.

Hello there,
I have a rather large dataset (around 1.5T) in my GCS that I'm trying to load with StreamingDataset, and when I'm trying to access it's __iter__ method, the Python session gets killed with what appears to be a memory leak. I note that random access to any sample works - it's only __iter__ that causes a problem.

For me, the following code works:

from streaming import StreamingDataset

os.environ["GCS_KEY"] = # value
os.environ["GCS_SECRET"] = # value

dataset = StreamingDataset(remote="gs://some/dataset/path/", shuffle=True)
obj = dataset[2500000]  # random access works fine!

However, any of the following breaks:

import torch.utils.data
dl = torch.utils.data.DataLoader(dataset)
for batch in dl:
   break

# trying to iterate over the dataloader breaks (the dataloader calls `__iter__`)

Or:

import torch.utils.data
dl = torch.utils.data.DataLoader(dataset)
obj = next(dl.__iter__())
# Calling __iter__ directly also breaks

When the Python process gets killed, I see the following message in my console:

/usr/local/lib/python3.10/multiprocessing/resource_tracker.py:224: UserWarning: resource_tracker: There appear to be 4 leaked shared_memory objects to clean up at shutdown

I'm not sure how to make this example reproducible - it probably works for your datasets, and it's only my system that's weirdly messed up, but I don't know what additional information to include.
Some data points, though:

  1. the bug persists whether or not I use shuffle, or what shuffling algo I use,
  2. the bug persists regardless of whether or not I specify a local caching directory,
  3. the bug disappears when I try to reproduce it with some tiny dataset.

I wrote the dataset with an MDSWriter.

No Python 3.11 support

Environment

  • OS: [Ubuntu 20.04]
  • Hardware (GPU, or instance type): N/A

To reproduce

Steps to reproduce the behavior:

  1. Create a conda environment with Python 3.11
  2. Attempt to install either from pypi or HEAD (pip install -e .)

Result:

ERROR: Could not find a version that satisfies the requirement torchtext>=0.10 (from mosaicml-streaming) (from versions: 0.1.1, 0.2.0, 0.2.1, 0.2.3, 0.3.1, 0.4.0, 0.5.0, 0.6.0)
ERROR: No matching distribution found for torchtext>=0.10

Expected behavior

Currently the latest version of Python is 3.11, but trying to install streaming with 3.11 causes dependency conflicts and will not install.

Additional context

Unsure if this is a bug or feature request.

Can't load dataset from S3

Hi! @karan6181 and others :)

I have some trouble with loading dataset from s3. It's interesting, that I can save my checkpoints to same s3

I'm launching llm-foundry pretrain configs, it looks like this:

data_local: #
data_remote: s3://my_bucket/path/to/data/my_dataset

I'm getting error

RuntimeError: Failed to download s3://my_bucket/path/to/data/my_dataset/train/index.json -> /tmp/tmp95xvpyl_/train/index.json. Got errors:
[ClientError('Object s3://my_bucket/path/to/data/my_dataset/train/index.json not found! Either check the bucket path or the bucket permission. If the bucket is a requester pays bucket, then provide the bucket name to the environment variable `MOSAICML_STREAMING_AWS_REQUESTER_PAYS`.'), ClientError('Objects3://my_bucket/path/to/data/my_dataset/train/index.json not found! Either check the bucket path or the bucket permission. If the bucket is a requester pays bucket, then provide the bucket name to the environment variable `MOSAICML_STREAMING_AWS_REQUESTER_PAYS`.'), ClientError('Object s3://my_bucket/path/to/data/my_dataset/train/index.json not found! Either check the bucket path or the bucket permission. If the bucket is a requester pays bucket, then provide the bucket name to the environment variable `MOSAICML_STREAMING_AWS_REQUESTER_PAYS`.')]
ERROR:composer.cli.launcher:Rank 0 crashed with exit code 1.

If I'm typing aws s3 ls --endpoint-url MY_ENDPOINT_URL.com s3://my_bucket/path/to/data/my_dataset/train/index.json I've got information about file, everything is OK

my exports before composer train.py yaml.yaml:

export AWS_ACCESS_KEY_ID="xxx"
export AWS_SECRET_ACCESS_KEY="xxx"
export S3_ENDPOINT_URL="MY_ENDPOINT_URL"
export AWS_S3_ENDPOINT="MY_ENDPOINT_URL"
export AWS_PROFILE="MY_PROFILE"
export AWS_DEFAULT_PROFILE="MY_PROFILE"

Do you have any ideas what's going wrong?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.