Git Product home page Git Product logo

xarray-beam's Introduction

Xarray-Beam

Xarray-Beam is a Python library for building Apache Beam pipelines with Xarray datasets.

The project aims to facilitate data transformations and analysis on large-scale multi-dimensional labeled arrays, such as:

  • Ad-hoc computation on Xarray data, by dividing a xarray.Dataset into many smaller pieces ("chunks").
  • Adjusting array chunks, using the Rechunker algorithm.
  • Ingesting large, multi-dimensional array datasets into an analysis-ready, cloud-optimized format, namely Zarr (see also Pangeo Forge).
  • Calculating statistics (e.g., "climatology") across distributed datasets with arbitrary groups.

For more about our approach and how to get started, read the documentation!

Warning: Xarray-Beam is a sharp tool 🔪

Xarray-Beam is relatively new, and focused on expert users:

  • We use it extensively at Google for processing large-scale weather datasets, but there is not yet a vibrant external community.
  • It provides low-level abstractions that facilitate writing very large scale data pipelines (e.g., 100+ TB), but by design it requires explicitly thinking about how every operation is parallelized.

Installation

Xarray-Beam requires recent versions of immutabledict, Xarray, Dask, Rechunker, Zarr, and Apache Beam. For best performance when writing Zarr files, use Xarray 0.19.0 or later.

Disclaimer

Xarray-Beam is an experiment that we are sharing with the outside world in the hope that it will be useful. It is not a supported Google product. We welcome feedback, bug reports and code contributions, but cannot guarantee they will be addressed.

See the "Contribution guidelines" for more.

Credits

Contributors:

  • Stephan Hoyer
  • Jason Hickey
  • Cenk Gazen
  • Alex Merose

xarray-beam's People

Contributors

alxmrs avatar arunsathiya avatar bastianzim avatar dependabot[bot] avatar mjwillson avatar rchen152 avatar shoyer avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

xarray-beam's Issues

Support missing chunks in ConsolidateChunks

This currently raises an error, but isn't always a programming error:

import xarray
import xarray_beam as xbeam
import numpy as np

ds = xarray.Dataset({"foo": (('x', 'y'), np.zeros((4, 4)))})
key = xbeam.Key({'x': 0, 'y': 0})

([(key, ds)] | xbeam.SplitChunks({'x': 2, 'y': 2}))[:-1] | xbeam.ConsolidateChunks({'x': -1, 'y': -1})
# ValueError: some expected chunks are missing for vars=None shape: [2, 2]
# len(inputs): 3 [while running 'ConsolidateChunks/Consolidate']

It would be nice if we supported missing chunks by filling in values with NaN, like xarray.merge.

Dimension order produced by Rechunk is opaque and not controllable; mismatch can cause errors from subsequent ChunksToZarr.

In a pipeline in which Rechunk is followed by ChunksToZarr, one can run into errors when the dimension order of variables output by Rechunk doesn't match that of the template you pass to ChunksToZarr, resulting in errors like:

ValueError: variable 'geopotential_quantiles' already exists with different dimension names ('hour', 'dayofyear', 'level', 'latitude', 'longitude', 'quantile') != ('level', 'hour', 'dayofyear', 'latitude', 'longitude', 'quantile'), but changing variable dimensions is not supported by to_zarr().

As far as I can tell Rechunk doesn't allow you to control the output dimension order (at least, not on a per-variable basis, which may be necessary to match a given template). An alternative could be to transpose the output template instead to match whatever Rechunk is going to produce, but it's hard to know what that's going to be as well.

As another way around this, it'd be nice if ChunksToZarr could just do the transpose rather than complain if it finds this kind of dimension mismatch (same dimensions in a different order).

Better documentation of behaviour with irregular source chunks

Hiya!

In some source datasets the original chunks have irregular sizes. For example one netcdf file per year, where some years are leap years hence shorter than others.

Although the data model would seem to support this in principle, I couldn't get Rechunk or ConsolidateChunks to work without regular source chunk sizes.

Is this a fundamental limitation or something that could be in scope to address? And do you have any recommendations for getting data like this into regular chunks? In the leap year case, first splitting the source chunks into one chunk per day using SplitChunks could be an option. Although in general this would require splitting into chunks of gcd(*all_possible_source_chunk_sizes) which could be too small.

Simultaneously read multiple Datasets into an Xarray-Beam pipeline

It is relatively common to need to load multiple xarray.Dataset objects, e.g., to compare two different models.

This currently can be done by loading data with separate calls to xbeam.DatasetToChunks, and by joining together the result with beam.CoGroupBykey. This works but is rather inefficient, involving an extra write of the data to disk. Ideally we could load the data in a single beam transform instead, e.g., xbeam.DatasetToChunks([ds1, ds2], chunks) would return a PCollection with elements of type tuple[xbeam.Key, tuple[xarray.Dataset, xarray.Dataset]].

CC @alxmrs

Consider omitting unchunked dimensions from Key objects created with DatasetToChunks

Currently we have (from https://xarray-beam.readthedocs.io/en/latest/read-write.html):

with beam.Pipeline() as p:
    p | xbeam.DatasetToChunks(ds, chunks={'time': 1000}) | beam.MapTuple(print_summary)
Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None)
  with <xarray.Dataset data_vars=['air'] dims={'lat': 25, 'time': 1000, 'lon': 53}>
Key(offsets={'lat': 0, 'lon': 0, 'time': 1000}, vars=None)
  with <xarray.Dataset data_vars=['air'] dims={'lat': 25, 'time': 1000, 'lon': 53}>
Key(offsets={'lat': 0, 'lon': 0, 'time': 2000}, vars=None)
  with <xarray.Dataset data_vars=['air'] dims={'lat': 25, 'time': 920, 'lon': 53}>

Should we instead omit lat and lon from these keys? This is less explicit but also more flexible, e.g,. if replacing these dimensions entirely with different dimensions, you don't need to update the keys.

Adding source distribution for xarray-beam on PyPI

Thanks for developing this library. I am interested in submitting a conda-forge recipe for xarray-beam. The most common source url for conda-forge recipes is the PyPI source distribution, followed by the GitHub release source distribution, followed by the GitHub archive (current xarray-beam option). Would you consider adding an sdist to the PyPI xarray-beam release?

Require using make_template() if providing a template to ChunksToZarr?

Currently we support passing an xarray.Dataset full of chunked dask.array objects as template into ChunksToZarr.

This is convenient in simple cases, but makes it easy to write pipelines that are super slow to setup, if you pass in a chunked Dataset with many small chunks (e.g., the default output of xarray.open_zarr()).

The breaking change here would be to require that the template argument was created via make_template(), by checking that each dask.array argument in the supplied Dataset only consists of a single chunk. We would also make zarr_chunks required when supplying a template, because it makes no sense to copy chunks from a template if using make_template.

Rechunk memory usage seems over `max_mem` limit

Hi there 👋

I'm trying out xarray-beam to do some Zarr rechunking. I'm wondering if anyone has any tips to control max_mem used in the rechunk operation.

I first tried rechunking a 128GB Cmip6 dataset and ran into an OOM error around 120GB of ram.

Failing 128GB Zarr store
image

Next, I tried rechunking a much smaller dataset (~10GB) and the memory consumption climbed to about 34GB before finishing.

Completing 10GB Zarr store (with high memory usage)
image

I'm basing my rechunking pipeline off of the ERA5_rechunking.py example in the docs, using the local runner and using the max_mem default of 1 GB.

Small dataset example:

import apache_beam as beam
import xarray_beam as xbeam

store = 'https://ncsa.osn.xsede.org/Pangeo/pangeo-forge/pangeo-forge/EOBS-feedstock/eobs-wind-speed.zarr'
# store = 'https://cpdataeuwest.blob.core.windows.net/cp-cmip/version1/data/DeepSD/ScenarioMIP.CCCma.CanESM5.ssp245.r1i1p1f1.day.DeepSD.pr.zarr'
output_store = 'eobs_test.zarr'

source_dataset, source_chunks = xbeam.open_zarr(store)
template = xbeam.make_template(source_dataset)

target_chunks = {'time':80, 'latitude':350, 'longitude':511}
itemsize = max(variable.dtype.itemsize for variable in template.values())

with beam.Pipeline(runner="DirectRunner", options=beam.pipeline.PipelineOptions(["--num_workers", '2'])) as root:
    (
    root
    | xbeam.DatasetToChunks(source_dataset, source_chunks, split_vars=True)
    | xbeam.Rechunk(  
        source_dataset.sizes,
        source_chunks,
        target_chunks,
        itemsize=itemsize,
    )
    | xbeam.ChunksToZarr(output_store, template, zarr_chunks=target_chunks)
)

Thanks!

Help with opening netcdf4 files via HTTP

This is not necessarily an xarray-beam specific question; however it relates to issues here (e.g. #37, #32) as well as in Pangeo Forge. So I am asking it here. I hope people here will be able to help me. Ultimately I hope this will help use resolve pangeo-forge/pangeo-forge-recipes#373 and move forward with merging Pangeo Forge and xarray-beam.

Goal: open xarray datasets from HTTP endpoints lazily and pass them around a beam pipeline. Delay loading of data variable until later in the pipeline.

What I have tried

Here is the basic pipeline I am working with. It is a simplified, self-contained version of what we will want to do in Pangeo Forge. (Note: this probably requires installing the latest version of fsspec from master, in order to get fsspec/filesystem_spec#973.)

import xarray as xr
import fsspec

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import BeamAssertException, assert_that


def open_http_url_with_fsspec(url):
    return fsspec.open(url, mode='rb')


def open_fsspec_openfile_with_xarray(of):
    with of as fp:
        with xr.open_dataset(fp, engine='h5netcdf') as ds:
            return ds


def load_xarray_ds(ds):
    return ds.load()


def is_xr_dataset():
    def _is_xr_dataset(actual):
        for ds in actual:
            if not isinstance(ds, xr.Dataset):
                raise BeamAssertException(f"Object {ds} has type {type(ds)}, expected xr.Dataset.")
            if not ds.AerosolOpticalThickness.variable._in_memory:
                raise BeamAssertException(f"Variable not in memory")

    return _is_xr_dataset


URL = 'https://www.unidata.ucar.edu/software/netcdf/examples/OMI-Aura_L2-example.nc'


with TestPipeline() as p:
    inputs = p | beam.Create([URL])
    open_files = inputs | beam.Map(open_http_url_with_fsspec)
    dsets = open_files | beam.Map(open_fsspec_openfile_with_xarray)
    loaded_dsets = dsets | beam.Map(load_xarray_ds)
    
    assert_that(loaded_dsets, is_xr_dataset())

When I run this I get ValueError: I/O operation on closed file. [while running '[1]: Map(load_xarray_ds)']

Full Traceback
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
~/Code/xarray/xarray/backends/file_manager.py in _acquire_with_cache_info(self, needs_lock)
    198             try:
--> 199                 file = self._cache[self._key]
    200             except KeyError:

~/Code/xarray/xarray/backends/lru_cache.py in __getitem__(self, key)
     52         with self._lock:
---> 53             value = self._cache[key]
     54             self._cache.move_to_end(key)

KeyError: [<class 'h5netcdf.core.File'>, (<File-like object HTTPFileSystem, https://www.unidata.ucar.edu/software/netcdf/examples/OMI-Aura_L2-example.nc>,), 'r', (('decode_vlen_strings', True), ('invalid_netcdf', None))]

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/transforms/core.py in <lambda>(x)
   1638   else:
-> 1639     wrapper = lambda x: [fn(x)]
   1640 

<ipython-input-1-53601ea18ea3> in load_xarray_ds(ds)
     19 def load_xarray_ds(ds):
---> 20     return ds.load()
     21 

~/Code/xarray/xarray/core/dataset.py in load(self, **kwargs)
    688             if k not in lazy_data:
--> 689                 v.load()
    690 

~/Code/xarray/xarray/core/variable.py in load(self, **kwargs)
    443         elif not is_duck_array(self._data):
--> 444             self._data = np.asarray(self._data)
    445         return self

~/Code/xarray/xarray/core/indexing.py in __array__(self, dtype)
    655     def __array__(self, dtype=None):
--> 656         self._ensure_cached()
    657         return np.asarray(self.array, dtype=dtype)

~/Code/xarray/xarray/core/indexing.py in _ensure_cached(self)
    652         if not isinstance(self.array, NumpyIndexingAdapter):
--> 653             self.array = NumpyIndexingAdapter(np.asarray(self.array))
    654 

~/Code/xarray/xarray/core/indexing.py in __array__(self, dtype)
    625     def __array__(self, dtype=None):
--> 626         return np.asarray(self.array, dtype=dtype)
    627 

~/Code/xarray/xarray/core/indexing.py in __array__(self, dtype)
    526         array = as_indexable(self.array)
--> 527         return np.asarray(array[self.key], dtype=None)
    528 

~/Code/xarray/xarray/backends/h5netcdf_.py in __getitem__(self, key)
     50     def __getitem__(self, key):
---> 51         return indexing.explicit_indexing_adapter(
     52             key, self.shape, indexing.IndexingSupport.OUTER_1VECTOR, self._getitem

~/Code/xarray/xarray/core/indexing.py in explicit_indexing_adapter(key, shape, indexing_support, raw_indexing_method)
    815     raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
--> 816     result = raw_indexing_method(raw_key.tuple)
    817     if numpy_indices.tuple:

~/Code/xarray/xarray/backends/h5netcdf_.py in _getitem(self, key)
     59         with self.datastore.lock:
---> 60             array = self.get_array(needs_lock=False)
     61             return array[key]

~/Code/xarray/xarray/backends/h5netcdf_.py in get_array(self, needs_lock)
     46     def get_array(self, needs_lock=True):
---> 47         ds = self.datastore._acquire(needs_lock)
     48         return ds.variables[self.variable_name]

~/Code/xarray/xarray/backends/h5netcdf_.py in _acquire(self, needs_lock)
    181     def _acquire(self, needs_lock=True):
--> 182         with self._manager.acquire_context(needs_lock) as root:
    183             ds = _nc4_require_group(

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/contextlib.py in __enter__(self)
    118         try:
--> 119             return next(self.gen)
    120         except StopIteration:

~/Code/xarray/xarray/backends/file_manager.py in acquire_context(self, needs_lock)
    186         """Context manager for acquiring a file."""
--> 187         file, cached = self._acquire_with_cache_info(needs_lock)
    188         try:

~/Code/xarray/xarray/backends/file_manager.py in _acquire_with_cache_info(self, needs_lock)
    204                     kwargs["mode"] = self._mode
--> 205                 file = self._opener(*self._args, **kwargs)
    206                 if self._mode == "w":

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/h5netcdf/core.py in __init__(self, path, mode, invalid_netcdf, phony_dims, **kwargs)
    720                     self._preexisting_file = mode in {"r", "r+", "a"}
--> 721                     self._h5file = h5py.File(path, mode, **kwargs)
    722         except Exception:

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/h5py/_hl/files.py in __init__(self, name, mode, driver, libver, userblock_size, swmr, rdcc_nslots, rdcc_nbytes, rdcc_w0, track_order, fs_strategy, fs_persist, fs_threshold, fs_page_size, page_buf_size, min_meta_keep, min_raw_keep, locking, **kwds)
    506                                  fs_page_size=fs_page_size)
--> 507                 fid = make_fid(name, mode, userblock_size, fapl, fcpl, swmr=swmr)
    508 

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/h5py/_hl/files.py in make_fid(name, mode, userblock_size, fapl, fcpl, swmr)
    219             flags |= h5f.ACC_SWMR_READ
--> 220         fid = h5f.open(name, flags, fapl=fapl)
    221     elif mode == 'r+':

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/h5f.pyx in h5py.h5f.open()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

~/Code/filesystem_spec/fsspec/spec.py in readinto(self, b)
   1587         out = memoryview(b).cast("B")
-> 1588         data = self.read(out.nbytes)
   1589         out[: len(data)] = data

~/Code/filesystem_spec/fsspec/implementations/http.py in read(self, length)
    573             length = min(self.size - self.loc, length)
--> 574         return super().read(length)
    575 

~/Code/filesystem_spec/fsspec/spec.py in read(self, length)
   1572         if self.closed:
-> 1573             raise ValueError("I/O operation on closed file.")
   1574         logger.debug("%s read: %i - %i" % (self, self.loc, self.loc + length))

ValueError: I/O operation on closed file.

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
<ipython-input-1-53601ea18ea3> in <module>
     41     loaded_dsets = dsets | beam.Map(load_xarray_ds)
     42 
---> 43     assert_that(loaded_dsets, is_xr_dataset())

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
    594     try:
    595       if not exc_type:
--> 596         self.result = self.run()
    597         self.result.wait_until_finish()
    598     finally:

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/testing/test_pipeline.py in run(self, test_runner_api)
    110 
    111   def run(self, test_runner_api=True):
--> 112     result = super().run(
    113         test_runner_api=(
    114             False if self.not_use_test_runner_api else test_runner_api))

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
    544       # When possible, invoke a round trip through the runner API.
    545       if test_runner_api and self._verify_runner_api_compatible():
--> 546         return Pipeline.from_runner_api(
    547             self.to_runner_api(use_fake_coders=True),
    548             self.runner,

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
    571         finally:
    572           shutil.rmtree(tmpdir)
--> 573       return self.runner.run_pipeline(self, self._options)
    574     finally:
    575       if not is_in_ipython():

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
    129       runner = BundleBasedDirectRunner()
    130 
--> 131     return runner.run_pipeline(pipeline, options)
    132 
    133 

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
    197         options.view_as(pipeline_options.ProfilingOptions))
    198 
--> 199     self._latest_run_result = self.run_via_runner_api(
    200         pipeline.to_runner_api(default_environment=self._default_environment))
    201     return self._latest_run_result

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
    208     # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
    209     #   the teststream (if any), and all the stages).
--> 210     return self.run_stages(stage_context, stages)
    211 
    212   @contextlib.contextmanager

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
    393           )
    394 
--> 395           stage_results = self._run_stage(
    396               runner_execution_context, bundle_context_manager)
    397 

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
    658     while True:
    659       last_result, deferred_inputs, fired_timers, watermark_updates = (
--> 660           self._run_bundle(
    661               runner_execution_context,
    662               bundle_context_manager,

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
    781         expected_timer_output)
    782 
--> 783     result, splits = bundle_manager.process_bundle(
    784         data_input, data_output, input_timers, expected_timer_output)
    785     # Now we collect all the deferred inputs remaining from bundle execution.

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
   1092             process_bundle_descriptor.id,
   1093             cache_tokens=[next(self._cache_token_generator)]))
-> 1094     result_future = self._worker_handler.control_conn.push(process_bundle_req)
   1095 
   1096     split_results = []  # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
    376       self._uid_counter += 1
    377       request.instruction_id = 'control_%s' % self._uid_counter
--> 378     response = self.worker.do_instruction(request)
    379     return ControlFuture(request.instruction_id, response)
    380 

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
    578     if request_type:
    579       # E.g. if register is set, this will call self.register(request.register))
--> 580       return getattr(self, request_type)(
    581           getattr(request, request_type), request.instruction_id)
    582     else:

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
    616         with self.maybe_profile(instruction_id):
    617           delayed_applications, requests_finalization = (
--> 618               bundle_processor.process_bundle(instruction_id))
    619           monitoring_infos = bundle_processor.monitoring_infos()
    620           monitoring_infos.extend(self.state_cache_metrics_fn())

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
    993                   element.timer_family_id, timer_data)
    994           elif isinstance(element, beam_fn_api_pb2.Elements.Data):
--> 995             input_op_by_transform_id[element.transform_id].process_encoded(
    996                 element.data)
    997 

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
    219       decoded_value = self.windowed_coder_impl.decode_from_stream(
    220           input_stream, True)
--> 221       self.output(decoded_value)
    222 
    223   def monitoring_infos(self, transform_id, tag_to_pcollection_id):

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.Operation.output()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.Operation.output()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/transforms/core.py in <lambda>(x)
   1637     wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
   1638   else:
-> 1639     wrapper = lambda x: [fn(x)]
   1640 
   1641   label = 'Map(%s)' % ptransform.label_from_callable(fn)

<ipython-input-1-53601ea18ea3> in load_xarray_ds(ds)
     18 
     19 def load_xarray_ds(ds):
---> 20     return ds.load()
     21 
     22 

~/Code/xarray/xarray/core/dataset.py in load(self, **kwargs)
    687         for k, v in self.variables.items():
    688             if k not in lazy_data:
--> 689                 v.load()
    690 
    691         return self

~/Code/xarray/xarray/core/variable.py in load(self, **kwargs)
    442             self._data = as_compatible_data(self._data.compute(**kwargs))
    443         elif not is_duck_array(self._data):
--> 444             self._data = np.asarray(self._data)
    445         return self
    446 

~/Code/xarray/xarray/core/indexing.py in __array__(self, dtype)
    654 
    655     def __array__(self, dtype=None):
--> 656         self._ensure_cached()
    657         return np.asarray(self.array, dtype=dtype)
    658 

~/Code/xarray/xarray/core/indexing.py in _ensure_cached(self)
    651     def _ensure_cached(self):
    652         if not isinstance(self.array, NumpyIndexingAdapter):
--> 653             self.array = NumpyIndexingAdapter(np.asarray(self.array))
    654 
    655     def __array__(self, dtype=None):

~/Code/xarray/xarray/core/indexing.py in __array__(self, dtype)
    624 
    625     def __array__(self, dtype=None):
--> 626         return np.asarray(self.array, dtype=dtype)
    627 
    628     def __getitem__(self, key):

~/Code/xarray/xarray/core/indexing.py in __array__(self, dtype)
    525     def __array__(self, dtype=None):
    526         array = as_indexable(self.array)
--> 527         return np.asarray(array[self.key], dtype=None)
    528 
    529     def transpose(self, order):

~/Code/xarray/xarray/backends/h5netcdf_.py in __getitem__(self, key)
     49 
     50     def __getitem__(self, key):
---> 51         return indexing.explicit_indexing_adapter(
     52             key, self.shape, indexing.IndexingSupport.OUTER_1VECTOR, self._getitem
     53         )

~/Code/xarray/xarray/core/indexing.py in explicit_indexing_adapter(key, shape, indexing_support, raw_indexing_method)
    814     """
    815     raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
--> 816     result = raw_indexing_method(raw_key.tuple)
    817     if numpy_indices.tuple:
    818         # index the loaded np.ndarray

~/Code/xarray/xarray/backends/h5netcdf_.py in _getitem(self, key)
     58         key = tuple(list(k) if isinstance(k, np.ndarray) else k for k in key)
     59         with self.datastore.lock:
---> 60             array = self.get_array(needs_lock=False)
     61             return array[key]
     62 

~/Code/xarray/xarray/backends/h5netcdf_.py in get_array(self, needs_lock)
     45 class H5NetCDFArrayWrapper(BaseNetCDF4Array):
     46     def get_array(self, needs_lock=True):
---> 47         ds = self.datastore._acquire(needs_lock)
     48         return ds.variables[self.variable_name]
     49 

~/Code/xarray/xarray/backends/h5netcdf_.py in _acquire(self, needs_lock)
    180 
    181     def _acquire(self, needs_lock=True):
--> 182         with self._manager.acquire_context(needs_lock) as root:
    183             ds = _nc4_require_group(
    184                 root, self._group, self._mode, create_group=_h5netcdf_create_group

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/contextlib.py in __enter__(self)
    117         del self.args, self.kwds, self.func
    118         try:
--> 119             return next(self.gen)
    120         except StopIteration:
    121             raise RuntimeError("generator didn't yield") from None

~/Code/xarray/xarray/backends/file_manager.py in acquire_context(self, needs_lock)
    185     def acquire_context(self, needs_lock=True):
    186         """Context manager for acquiring a file."""
--> 187         file, cached = self._acquire_with_cache_info(needs_lock)
    188         try:
    189             yield file

~/Code/xarray/xarray/backends/file_manager.py in _acquire_with_cache_info(self, needs_lock)
    203                     kwargs = kwargs.copy()
    204                     kwargs["mode"] = self._mode
--> 205                 file = self._opener(*self._args, **kwargs)
    206                 if self._mode == "w":
    207                     # ensure file doesn't get overridden when opened again

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/h5netcdf/core.py in __init__(self, path, mode, invalid_netcdf, phony_dims, **kwargs)
    719                 else:
    720                     self._preexisting_file = mode in {"r", "r+", "a"}
--> 721                     self._h5file = h5py.File(path, mode, **kwargs)
    722         except Exception:
    723             self._closed = True

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/h5py/_hl/files.py in __init__(self, name, mode, driver, libver, userblock_size, swmr, rdcc_nslots, rdcc_nbytes, rdcc_w0, track_order, fs_strategy, fs_persist, fs_threshold, fs_page_size, page_buf_size, min_meta_keep, min_raw_keep, locking, **kwds)
    505                                  fs_persist=fs_persist, fs_threshold=fs_threshold,
    506                                  fs_page_size=fs_page_size)
--> 507                 fid = make_fid(name, mode, userblock_size, fapl, fcpl, swmr=swmr)
    508 
    509             if isinstance(libver, tuple):

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/h5py/_hl/files.py in make_fid(name, mode, userblock_size, fapl, fcpl, swmr)
    218         if swmr and swmr_support:
    219             flags |= h5f.ACC_SWMR_READ
--> 220         fid = h5f.open(name, flags, fapl=fapl)
    221     elif mode == 'r+':
    222         fid = h5f.open(name, h5f.ACC_RDWR, fapl=fapl)

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/h5f.pyx in h5py.h5f.open()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

~/Code/filesystem_spec/fsspec/spec.py in readinto(self, b)
   1586         """
   1587         out = memoryview(b).cast("B")
-> 1588         data = self.read(out.nbytes)
   1589         out[: len(data)] = data
   1590         return len(data)

~/Code/filesystem_spec/fsspec/implementations/http.py in read(self, length)
    572         else:
    573             length = min(self.size - self.loc, length)
--> 574         return super().read(length)
    575 
    576     async def async_fetch_all(self):

~/Code/filesystem_spec/fsspec/spec.py in read(self, length)
   1571             length = self.size - self.loc
   1572         if self.closed:
-> 1573             raise ValueError("I/O operation on closed file.")
   1574         logger.debug("%s read: %i - %i" % (self, self.loc, self.loc + length))
   1575         if length == 0:

ValueError: I/O operation on closed file. [while running '[1]: Map(load_xarray_ds)']

This is not quite the same error as I am getting in pangeo-forge/pangeo-forge-recipes#373; there it it instead OSError: Unable to open file (incorrect metadata checksum after all read attempts). I have not been able to reproduce that error outside of pytest. However, my example here fails at the same point: when calling ds.load() on an h5netcdf-backed xarray dataset pointing at an fsspec HTTPFile object.

What is wrong

Overall my concern is with this pattern:

def open_fsspec_openfile_with_xarray(of):
    with of as fp:
        return xr.open_dataset(fp, engine='h5netcdf') as ds:
            return ds

It feels wrong. I should either be yeilding or else not using context managers. The first context manager is necessary. The second may be optional. But overall my understanding is that the outputs of the Map function need to be pickled, in which case the contextmanager pattern doesn't make sense at all. I have tried various other flavors, like

def open_fsspec_openfile_with_xarray(of):
    with of as fp:
        with xr.open_dataset(fp, engine='h5netcdf') as ds:
            pass
    return ds

or

def open_fsspec_openfile_with_xarray(of):
    with of as fp:
        ds =  xr.open_dataset(fp, engine='h5netcdf')
    return ds

but nothing seems to work. The fundamental issue seems to be simply this

with fsspec.open(URL) as fp:
    ds = xr.open_dataset(fp, engine='h5netcdf')
ds.load()  # -> ValueError: I/O operation on closed file.

Has anyone here managed to make something like this work? I feel like I'm missing something obvious.

cc @martindurant

Some notes on alignment with pangeo-forge-recipes around "keys"

At today's Pangeo Forge meeting, @alxmrs told us a bit more about Xarray Beam. I'm happy to note that we are aligning a bit around certain abstractions. I'm not proposing to merge or share code or anything at this stage. Just happy to see that we have independently arrived at some similar concepts. Comparing side by side may also suggest improvements.

Xarray Beam

https://xarray-beam.readthedocs.io/en/latest/data-model.html#keys-in-xarray-beam

To keep track of how individual records could be combined into a larger (virtual) dataset, Xarray-Beam defines a Key object. > Key objects consist of:

  • offsets: integer offests for chunks from the origin in an immutabledict
  • vars: The subset of variables included in each chunk, either as a frozenset, or as None to indicate “all variables”.

Pangeo Forge

We currently call this an "index," although I agree that key is a better name. Our use of indexes is less well documented as it is not really public API but more of an internal thing. We mention it here: https://pangeo-forge.readthedocs.io/en/latest/recipe_user_guide/file_patterns.html#inspect-a-filepattern

The index is its own special type of object used internally by recipes, a pangeo_forge_recipes.patterns.Index, (which is basically a tuple of one or more pangeo_forge_recipes.patterns.DimIndex objects).

The code is pretty straightforward and the comments explain things better.

https://github.com/pangeo-forge/pangeo-forge-recipes/blob/49997cb52cff466bd394c1348ef23981e782a4d9/pangeo_forge_recipes/patterns.py#L70-L114

These indexes are used both by FilePatterns (see #31) as well as for parallel writing of Zarr datasets.

cc @cisaacstern

Running pipelines on AWS?

Hello! Newbie here. :)
Any tips for someone who would like to try an xarray-beam rechunking pipeline on AWS?
(USGS is wedded to AWS at the moment)

Support for striding / rolling windows

Nice library, thanks! Will it work to pass e.g. dataset.rolling(...) or dataset.rolling(...).construct(...) to xarray_beam.DatasetToChunks? (I'm assuming not, or that it would result in something hugely inefficient?) If not, would it be possible to support striding / sliding windows in DatasetToChunks?

Support opening datasets with file-like objects in a Beam pipeline

I experimented a bit more with this based on @mjwillson's suggestion.

Amazingly, it seems that uses file-like objects in Xarray does actually work as used here, though making a local copy might still have better performance.

What doesn't work yet -- but hopefully with small upstream changes to Xarray could work -- is passing xarray datasets opened with these file-like objects into a Beam pipeilne. That could let us do the actual data loading from netCDF in separate workers, which could be quite a win!

Originally posted by @shoyer in #31 (comment)

Add CI to check that code has been formatted with pyink formatter.

          One other thing I'll note is that I've been formatting code in Xarray-Beam with the (Google internal) version of the "pyink" formatter, using 2 space indentation: https://github.com/google/pyink

It would be nice if you could apply that manually to this change -- ideally we would add some sort of automatic formatter checker.

Originally posted by @shoyer in #69 (comment)

Register Beam coder(s) to avoid "Using fallback deterministic coder" warnings

Currently, running an Xarray-Beam pipeline generates (irrelevant) warnings about using fallback deterministic coders:
WARNING:apache_beam.coders.coder_impl:Using fallback deterministic coder for type '<class 'xarray_beam._src.core.Key'>' in '[6]: Mean/PerKey/CombinePerKey(MeanCombineFn)/GroupByKey'.

Theoretically, we should be able to explicitly register coders for xarray_beam.Key and tuple[xarray_beam.Key, xarray.Dataset] to turn these off.

Indicate variables in xarray-beam keys

Currently, we identify chunks only by overall offsets along each dimension. This works OK, but hits scalability limits for some pipelines, such as the ERA5 rechunking example in #8.

It would be nice to be able to have a SplitVariables() transform, that allowed for applying a pipeline in parallel to each data-variable in a Dataset.

To do so, we need some consistent way to identify a limited set of variables, not just chunk offsets. I propose to do so using a new Key class modeled off of the existing ChunkKey:

  • Key(offset={'x': 0, 'y': 1}, vars={'foo'}) indicates a chunk of a dataset at positional offset x=0, y=1 and with only the variable foo.
  • Key(offset={'x': 0, 'y': 1}, vars=None) indicates variables are not split.
  • Key(offset=None, vars={'foo'}) or Key(offset={}, vars={'foo'}) indicates dimensions are not split.

Key should support most of the user facing API of ChunkKey, e.g., key | {'time': 0} should still work. However:

  • Key now is a frozen dataclass consisting of a frozen dict and a frozen set (rather than a mapping itself), so key[dim] will have to become key.offsets[dim].
  • Key.to_slices doesn't really make sense (it could apply only to some variables).
  • To support modification without mutation, we'll add a new replace() method, e.g., key.replace(vars=None).

Consider adding ZarrToChunks() and/or an open_zarr() helper function

These could facilitate directly opening data from Zarr using idiomatic patterns in Xarray-Beam (e.g., using Xarray's lazy indexing machinery instead of dask).

I'm imaging open_zarr() returning a tuple of values transform, template, chunks providing exactly the information needed to use the dataset in a Zarr-to-Zarr pipeline:

  • transform would be the beam PTransform that could be used in a pipeline (equivalent to the result of xbeam.ZarrToChunks()).
  • template itself would be an efficient lazy xarray.Dataset consisting of a single dask chunk, e.g., equivalent to xarray.zeros_like(xarray.open_zarr(..., chunks=None).chunk()).
  • chunks would be a dict of chunks on the underlying dataset.

Usage examples:

with beam.Pipeline() as p:
  p | xbeam.ZarrToChunks(..., desired_chunks) | ...
with beam.Pipeline() as p:
  load_data, template, original_chunks = xbeam.open_zarr(...)
  p | load_data | beam.MapTuple(...) | xbeam.ChunksToZarr(..., template, original_chunks)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.