Git Product home page Git Product logo

Comments (19)

shoyer avatar shoyer commented on May 8, 2024 1

Just a guess, but have you tried asking Beam to use cloudpickle rather than dill? I believe this is a recently added option. Cloudpickle is used for Dask, so this might work a bit more consistently.

from xarray-beam.

rabernat avatar rabernat commented on May 8, 2024 1

Ok I am satisfied with my workaround in pangeo-forge/pangeo-forge-recipes@c20f3fd, which is basically

if hasattr(open_file, "open"):
    open_file = open_file.open()
ds = xr.open_dataset(open_file)

This seems to reliably serialize with whatever fsspec can throw at us.

Sorry for the noise here. I appreciate the help.

from xarray-beam.

rabernat avatar rabernat commented on May 8, 2024

One other weird datapoint that I have discovered is that I can pickle the un-loadable dataset, load it from the pickle, and then call .load() and it works!

url = 'https://www.unidata.ucar.edu/software/netcdf/examples/OMI-Aura_L2-example.nc' # netcdf

open_file = fsspec.open(url, mode='rb')
with open_file as fp:
    with xr.open_dataset(fp, engine='h5netcdf') as ds:
        pass

ds_pk = pickle.dumps(ds)
ds1 = pickle.loads(ds_pk)
ds1.load()

This suggests that the dataset is serializable in some way. But something is happening in beam that is preventing that path from being taken.

from xarray-beam.

rabernat avatar rabernat commented on May 8, 2024

Ok the mystery deepens even further. After running the code just above, you can then call ds.load() on the original dataset, and it works. This suggests that the two datasets are somehow sharing some state, perhaps via Xarray's CachingFileManager. @shoyer does that seem plausible?

from xarray-beam.

rabernat avatar rabernat commented on May 8, 2024

I'm enjoying the conversation with myself, so I'll just keep going... 🙃

I figured out a bizarre workaround that makes it work

def open_fsspec_openfile_with_xarray(of):
    with of as fp:
        with xr.open_dataset(fp, engine='h5netcdf') as ds:
            pass
    ds1 = pickle.loads(pickle.dumps(ds))
    return ds1

from xarray-beam.

martindurant avatar martindurant commented on May 8, 2024

Wait, what!?

I would have thought that pickle.dumps() is exactly what was being done anyway, so roundtripping with pickle a second time really shouldn't be making any difference. Can you see anything difference between the __dict__ of ds and ds1 or their binary pickled representation? What about if you pickle with protocol=1?

By the way, copy.deepcopy should do about the same thing, without first making the bytestring.

from xarray-beam.

rabernat avatar rabernat commented on May 8, 2024

I have not tried to introspect the pickle objects. I don't know much about pickle details and internals. #49 (comment) is a fully isolated, copy-pasteable reproducer for this, so if you wanted to dig in, that would be the place to start.

My best guess is that there is an is_closed parameter somewhere deep in the h5py.File object, and that pickling / unpickling triggers this to be reset. For http, it doesn't really mean anything for a file to be "open", so this notion of open / closed files can only cause problem.

This could also be interacting with Xarray in some unexpected ways. See pydata/xarray#4242 for some discussion of that.

from xarray-beam.

martindurant avatar martindurant commented on May 8, 2024

I got some weird conversion error

~/conda/envs/py38/lib/python3.8/site-packages/xarray/conventions.py in _update_bounds_attributes(variables)
    399     for v in variables.values():
    400         attrs = v.attrs
--> 401         has_date_units = "units" in attrs and "since" in attrs["units"]
    402         if has_date_units and "bounds" in attrs:
    403             if attrs["bounds"] in variables:

TypeError: argument of type 'Empty' is not iterable

Perhaps I need later versions of things.

from xarray-beam.

rabernat avatar rabernat commented on May 8, 2024

Just try with a different remote file, e.g

url = 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc'

from xarray-beam.

rabernat avatar rabernat commented on May 8, 2024

Thanks for the suggestion Stephan. I tried as follows

from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions(pickle_library="cloudpickle")
with TestPipeline(options=options) as p:

However, it appears to have no effect. I can specify pickle_library="foobar" with no consequences. I am on Beam 2.38.0, so it should support the cloudpickle option, which was introduced in 2.36.0. Maybe I am not passing the option correctly...

from xarray-beam.

rabernat avatar rabernat commented on May 8, 2024

I can specify pickle_library="foobar" with no consequences.

This is probably a red herring. The code to set the pickler doesn't raise any errors if you pass an invalid option. So I am going to assume that that option does work, and that it doesn't solve the problem.

from xarray-beam.

rabernat avatar rabernat commented on May 8, 2024

Some more deep introspecting into these objects.

import xarray as xr
import fsspec
from cloudpickle import dumps, loads
from pprint import pprint as print

url = 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc'
with fsspec.open(url) as fp:
    with xr.open_dataset(fp, engine='h5netcdf') as ds0:
        pass
    
ds_pk = dumps(ds0)
ds1 = loads(ds_pk)

# go deep inside Xarray's array wrappers to get out the `xarray.backends.h5netcdf_.H5NetCDFArrayWrapper` objects
wrapper0 = ds0.T_ZONES.variable._data.array.array.array.array.array
wrapper1 = ds1.T_ZONES.variable._data.array.array.array.array.array

# now go inside those and get the actual `fsspec.implementations.http.HTTPFile` objects 
fobj0 = wrapper0.datastore._manager._args[0]
fobj1 = wrapper1.datastore._manager._args[0]

print(fobj0.__dict__)
print(fobj1.__dict__)
{'_closed': True,
 '_details': {'name': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc',
              'size': 1183063,
              'type': 'file'},
 'asynchronous': False,
 'autocommit': True,
 'blocksize': 5242880,
 'cache': None,
 'end': None,
 'fs': <fsspec.implementations.http.HTTPFileSystem object at 0x187cf1c10>,
 'kwargs': {},
 'loc': 1183063,
 'loop': <_UnixSelectorEventLoop running=True closed=False debug=False>,
 'mode': 'rb',
 'path': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc',
 'session': <aiohttp.client.ClientSession object at 0x187cf1e20>,
 'size': 1183063,
 'start': None,
 'url': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc'}
{'_closed': False,
 '_details': {'name': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc',
              'size': 1183063,
              'type': 'file'},
 'asynchronous': False,
 'autocommit': True,
 'blocksize': 5242880,
 'cache': <fsspec.caching.BaseCache object at 0x18d4a32e0>,
 'end': None,
 'fs': <fsspec.implementations.http.HTTPFileSystem object at 0x187cf1c10>,
 'kwargs': {},
 'loc': 0,
 'loop': <_UnixSelectorEventLoop running=True closed=False debug=False>,
 'mode': 'rb',
 'path': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc',
 'session': <aiohttp.client.ClientSession object at 0x187cf1e20>,
 'size': 1183063,
 'start': None,
 'url': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc'}

I tried taking fobj0 and manually setting

fobj0._closed = False
fobj0.loc = 0

ds0.load()

However, this lead to the error

~/Code/filesystem_spec/fsspec/spec.py in readinto(self, b)
   1586         """
   1587         out = memoryview(b).cast("B")
-> 1588         data = self.read(out.nbytes)
   1589         out[: len(data)] = data
   1590         return len(data)

~/Code/filesystem_spec/fsspec/implementations/http.py in read(self, length)
    572         else:
    573             length = min(self.size - self.loc, length)
--> 574         return super().read(length)
    575 
    576     async def async_fetch_all(self):

~/Code/filesystem_spec/fsspec/spec.py in read(self, length)
   1576             # don't even bother calling fetch
   1577             return b""
-> 1578         out = self.cache._fetch(self.loc, self.loc + length)
   1579         self.loc += len(out)
   1580         return out

AttributeError: 'NoneType' object has no attribute '_fetch'

from xarray-beam.

martindurant avatar martindurant commented on May 8, 2024

So we need a file-like where close() as a no-op? It seems like a natural thing to discard read buffers if close gets called, and I suppose that is expected to happen on context end, whether or not explicitly done by xarray.

from xarray-beam.

rabernat avatar rabernat commented on May 8, 2024

Perhaps we are barking up the wrong tree. Once the dataset is passed through pickle or cloudpickle, it becomes loadable again. In #49 (comment) @shoyer suggested we should be able to force beam to use cloudpickle to serialize things. So it should be working without any changes to our libraries. I am currently trying to dig deeper into the dill vs. cloudpickle issue.

from xarray-beam.

shoyer avatar shoyer commented on May 8, 2024

The fundamental issue seems to be simply this

with fsspec.open(URL) as fp:
    ds = xr.open_dataset(fp, engine='h5netcdf')
ds.load()  # -> ValueError: I/O operation on closed file.

I'm finally looking into this a little in detail.

Why do you use context managers in the functions that you're passing into beam.Map? I would generally not expect something like this to work -- the context manager is explicitly closing the file object.

Objects passed between transforms in a Beam pipeline are not necessarily serialized via pickle (which as I understand would fix this by reopening the file), because it's unnecessary overhead if the separate map stages are evaluated on the same machine.

So anyways, if I were do to this I would not use a context manager in the opener function.

from xarray-beam.

rabernat avatar rabernat commented on May 8, 2024

Thanks a lot Stephan! I appreciate your time.

Why do you use context managers in the functions that you're passing into beam.Map?

Because that's what fsspec seems to require! I went on an extremely deep dive on this in fsspec/filesystem_spec#579. In the end, the recommendation from @martindurant was to always use the context manager when opening a file-like object (see fsspec/filesystem_spec#579 (comment)). However, that requirement seems incompatible with serialization, as you noted.

I would love to see an example of opening a NetCDF4 file remotely over HTTP using the h5netcdf engine without a context manager.

from xarray-beam.

rabernat avatar rabernat commented on May 8, 2024

I would love to see an example of opening a NetCDF4 file remotely over HTTP using the h5netcdf engine without a context manager.

Ok, so I actually did test this case in fsspec/filesystem_spec#579 (comment). The following works with HTTP

def open_fsspec_openfile_with_xarray(of):
    return xr.open_dataset(of.open(), engine='h5netcdf')

@martindurant, is that kosher?

from xarray-beam.

martindurant avatar martindurant commented on May 8, 2024

Yes, it's fine - the file will "close" (meaning dropping the buffer) when garbage collected. Local files instances made this way also pickle.

from xarray-beam.

rabernat avatar rabernat commented on May 8, 2024

But if I do, that I again hit the problem (from fsspec/filesystem_spec#579 (comment)) that fs.open and fsspec.open have different behavior! 😫

This works

open_file = fsspec.open(url)
ds = xr.open_dataset(open_file.open())

# but not
ds = xr.open_dataset(open_file)
# -> AttributeError: 'HTTPFile' object has no attribute '__fspath__'

or this works

from fsspec.implementations.http import HTTPFileSystem
fs = HTTPFileSystem()
open_file = fs.open(url)
ds = xr.open_dataset(open_file)

# but not
ds = xr.open_dataset(open_file.open())
# -> AttributeError: 'HTTPFile' object has no attribute 'open'"

from xarray-beam.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.