Comments (19)
Just a guess, but have you tried asking Beam to use cloudpickle rather than dill? I believe this is a recently added option. Cloudpickle is used for Dask, so this might work a bit more consistently.
from xarray-beam.
Ok I am satisfied with my workaround in pangeo-forge/pangeo-forge-recipes@c20f3fd, which is basically
if hasattr(open_file, "open"):
open_file = open_file.open()
ds = xr.open_dataset(open_file)
This seems to reliably serialize with whatever fsspec can throw at us.
Sorry for the noise here. I appreciate the help.
from xarray-beam.
One other weird datapoint that I have discovered is that I can pickle the un-loadable dataset, load it from the pickle, and then call .load()
and it works!
url = 'https://www.unidata.ucar.edu/software/netcdf/examples/OMI-Aura_L2-example.nc' # netcdf
open_file = fsspec.open(url, mode='rb')
with open_file as fp:
with xr.open_dataset(fp, engine='h5netcdf') as ds:
pass
ds_pk = pickle.dumps(ds)
ds1 = pickle.loads(ds_pk)
ds1.load()
This suggests that the dataset is serializable in some way. But something is happening in beam that is preventing that path from being taken.
from xarray-beam.
Ok the mystery deepens even further. After running the code just above, you can then call ds.load()
on the original dataset, and it works. This suggests that the two datasets are somehow sharing some state, perhaps via Xarray's CachingFileManager. @shoyer does that seem plausible?
from xarray-beam.
I'm enjoying the conversation with myself, so I'll just keep going... 🙃
I figured out a bizarre workaround that makes it work
def open_fsspec_openfile_with_xarray(of):
with of as fp:
with xr.open_dataset(fp, engine='h5netcdf') as ds:
pass
ds1 = pickle.loads(pickle.dumps(ds))
return ds1
from xarray-beam.
Wait, what!?
I would have thought that pickle.dumps()
is exactly what was being done anyway, so roundtripping with pickle a second time really shouldn't be making any difference. Can you see anything difference between the __dict__
of ds and ds1 or their binary pickled representation? What about if you pickle with protocol=1
?
By the way, copy.deepcopy
should do about the same thing, without first making the bytestring.
from xarray-beam.
I have not tried to introspect the pickle objects. I don't know much about pickle details and internals. #49 (comment) is a fully isolated, copy-pasteable reproducer for this, so if you wanted to dig in, that would be the place to start.
My best guess is that there is an is_closed
parameter somewhere deep in the h5py.File object, and that pickling / unpickling triggers this to be reset. For http, it doesn't really mean anything for a file to be "open", so this notion of open / closed files can only cause problem.
This could also be interacting with Xarray in some unexpected ways. See pydata/xarray#4242 for some discussion of that.
from xarray-beam.
I got some weird conversion error
~/conda/envs/py38/lib/python3.8/site-packages/xarray/conventions.py in _update_bounds_attributes(variables)
399 for v in variables.values():
400 attrs = v.attrs
--> 401 has_date_units = "units" in attrs and "since" in attrs["units"]
402 if has_date_units and "bounds" in attrs:
403 if attrs["bounds"] in variables:
TypeError: argument of type 'Empty' is not iterable
Perhaps I need later versions of things.
from xarray-beam.
Just try with a different remote file, e.g
url = 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc'
from xarray-beam.
Thanks for the suggestion Stephan. I tried as follows
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions(pickle_library="cloudpickle")
with TestPipeline(options=options) as p:
However, it appears to have no effect. I can specify pickle_library="foobar"
with no consequences. I am on Beam 2.38.0, so it should support the cloudpickle option, which was introduced in 2.36.0. Maybe I am not passing the option correctly...
from xarray-beam.
I can specify
pickle_library="foobar"
with no consequences.
This is probably a red herring. The code to set the pickler doesn't raise any errors if you pass an invalid option. So I am going to assume that that option does work, and that it doesn't solve the problem.
from xarray-beam.
Some more deep introspecting into these objects.
import xarray as xr
import fsspec
from cloudpickle import dumps, loads
from pprint import pprint as print
url = 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc'
with fsspec.open(url) as fp:
with xr.open_dataset(fp, engine='h5netcdf') as ds0:
pass
ds_pk = dumps(ds0)
ds1 = loads(ds_pk)
# go deep inside Xarray's array wrappers to get out the `xarray.backends.h5netcdf_.H5NetCDFArrayWrapper` objects
wrapper0 = ds0.T_ZONES.variable._data.array.array.array.array.array
wrapper1 = ds1.T_ZONES.variable._data.array.array.array.array.array
# now go inside those and get the actual `fsspec.implementations.http.HTTPFile` objects
fobj0 = wrapper0.datastore._manager._args[0]
fobj1 = wrapper1.datastore._manager._args[0]
print(fobj0.__dict__)
print(fobj1.__dict__)
{'_closed': True,
'_details': {'name': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc',
'size': 1183063,
'type': 'file'},
'asynchronous': False,
'autocommit': True,
'blocksize': 5242880,
'cache': None,
'end': None,
'fs': <fsspec.implementations.http.HTTPFileSystem object at 0x187cf1c10>,
'kwargs': {},
'loc': 1183063,
'loop': <_UnixSelectorEventLoop running=True closed=False debug=False>,
'mode': 'rb',
'path': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc',
'session': <aiohttp.client.ClientSession object at 0x187cf1e20>,
'size': 1183063,
'start': None,
'url': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc'}
{'_closed': False,
'_details': {'name': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc',
'size': 1183063,
'type': 'file'},
'asynchronous': False,
'autocommit': True,
'blocksize': 5242880,
'cache': <fsspec.caching.BaseCache object at 0x18d4a32e0>,
'end': None,
'fs': <fsspec.implementations.http.HTTPFileSystem object at 0x187cf1c10>,
'kwargs': {},
'loc': 0,
'loop': <_UnixSelectorEventLoop running=True closed=False debug=False>,
'mode': 'rb',
'path': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc',
'session': <aiohttp.client.ClientSession object at 0x187cf1e20>,
'size': 1183063,
'start': None,
'url': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc'}
I tried taking fobj0
and manually setting
fobj0._closed = False
fobj0.loc = 0
ds0.load()
However, this lead to the error
~/Code/filesystem_spec/fsspec/spec.py in readinto(self, b)
1586 """
1587 out = memoryview(b).cast("B")
-> 1588 data = self.read(out.nbytes)
1589 out[: len(data)] = data
1590 return len(data)
~/Code/filesystem_spec/fsspec/implementations/http.py in read(self, length)
572 else:
573 length = min(self.size - self.loc, length)
--> 574 return super().read(length)
575
576 async def async_fetch_all(self):
~/Code/filesystem_spec/fsspec/spec.py in read(self, length)
1576 # don't even bother calling fetch
1577 return b""
-> 1578 out = self.cache._fetch(self.loc, self.loc + length)
1579 self.loc += len(out)
1580 return out
AttributeError: 'NoneType' object has no attribute '_fetch'
from xarray-beam.
So we need a file-like where close()
as a no-op? It seems like a natural thing to discard read buffers if close gets called, and I suppose that is expected to happen on context end, whether or not explicitly done by xarray.
from xarray-beam.
Perhaps we are barking up the wrong tree. Once the dataset is passed through pickle or cloudpickle, it becomes loadable again. In #49 (comment) @shoyer suggested we should be able to force beam to use cloudpickle to serialize things. So it should be working without any changes to our libraries. I am currently trying to dig deeper into the dill vs. cloudpickle issue.
from xarray-beam.
The fundamental issue seems to be simply this
with fsspec.open(URL) as fp: ds = xr.open_dataset(fp, engine='h5netcdf') ds.load() # -> ValueError: I/O operation on closed file.
I'm finally looking into this a little in detail.
Why do you use context managers in the functions that you're passing into beam.Map? I would generally not expect something like this to work -- the context manager is explicitly closing the file object.
Objects passed between transforms in a Beam pipeline are not necessarily serialized via pickle (which as I understand would fix this by reopening the file), because it's unnecessary overhead if the separate map stages are evaluated on the same machine.
So anyways, if I were do to this I would not use a context manager in the opener function.
from xarray-beam.
Thanks a lot Stephan! I appreciate your time.
Why do you use context managers in the functions that you're passing into beam.Map?
Because that's what fsspec seems to require! I went on an extremely deep dive on this in fsspec/filesystem_spec#579. In the end, the recommendation from @martindurant was to always use the context manager when opening a file-like object (see fsspec/filesystem_spec#579 (comment)). However, that requirement seems incompatible with serialization, as you noted.
I would love to see an example of opening a NetCDF4 file remotely over HTTP using the h5netcdf engine without a context manager.
from xarray-beam.
I would love to see an example of opening a NetCDF4 file remotely over HTTP using the h5netcdf engine without a context manager.
Ok, so I actually did test this case in fsspec/filesystem_spec#579 (comment). The following works with HTTP
def open_fsspec_openfile_with_xarray(of):
return xr.open_dataset(of.open(), engine='h5netcdf')
@martindurant, is that kosher?
from xarray-beam.
Yes, it's fine - the file will "close" (meaning dropping the buffer) when garbage collected. Local files instances made this way also pickle.
from xarray-beam.
But if I do, that I again hit the problem (from fsspec/filesystem_spec#579 (comment)) that fs.open
and fsspec.open
have different behavior! 😫
This works
open_file = fsspec.open(url)
ds = xr.open_dataset(open_file.open())
# but not
ds = xr.open_dataset(open_file)
# -> AttributeError: 'HTTPFile' object has no attribute '__fspath__'
or this works
from fsspec.implementations.http import HTTPFileSystem
fs = HTTPFileSystem()
open_file = fs.open(url)
ds = xr.open_dataset(open_file)
# but not
ds = xr.open_dataset(open_file.open())
# -> AttributeError: 'HTTPFile' object has no attribute 'open'"
from xarray-beam.
Related Issues (20)
- Figure out how to combine efforts with xpartition HOT 7
- Support for striding / rolling windows HOT 1
- Consider adding ZarrToChunks() and/or an open_zarr() helper function
- Support pangeo_forge_recipes.patterns.MergeDim in `FilePatternToChunks`
- Some notes on alignment with pangeo-forge-recipes around "keys" HOT 1
- Better documentation of behaviour with irregular source chunks HOT 5
- Support opening datasets with file-like objects in a Beam pipeline
- Add `split_vars` to FilePatternToChunks transform.
- Document how to add `weather-dl` clients & manifests in the Contributing guide. HOT 1
- Running pipelines on AWS? HOT 1
- Consider omitting unchunked dimensions from Key objects created with DatasetToChunks HOT 1
- Adding source distribution for xarray-beam on PyPI HOT 2
- Support missing chunks in ConsolidateChunks
- Require using make_template() if providing a template to ChunksToZarr? HOT 2
- Simultaneously read multiple Datasets into an Xarray-Beam pipeline HOT 2
- Add CI to check that code has been formatted with pyink formatter.
- Error in ChunksToZarr appears on docs page
- Register Beam coder(s) to avoid "Using fallback deterministic coder" warnings
- Race condition in ChunksToZarr when template is not supplied explicitly as an xarray.Dataset
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from xarray-beam.