Comments (18)
from @jgrss
I don't think the issue is with the individual workers, because as you say, the arrays are not that big. From what I read, the problem is with multiprocessing and it's memory management of queue workers. It seems that it might be more of an issue with Python <3.7. Which version are you running?
Possible short term solutions:
Pass keyword arguments to the pool executor so that you can try things like memory mapping or maxtasksperchild.
Try using concurrent.futures instead of multiprocessing by passing scheduler='processes'.
from geowombat.
@mmann1123 Can you show an example of what your input ds
looks like so I can try and replicate it?
from geowombat.
How about something like this?
from pathlib import Path
import dask.array as da
class WriteDaskArray(object):
def __init__(self,
filename,
overwrite=False,
**kwargs):
self.filename = filename
self.overwrite = overwrite
self.kwargs = kwargs
fpath = Path(self.filename)
self.parent = fpath.parent
self.stem = fpath.stem
self.suffix = fpath.suffix
def __setitem__(self, key, item):
if len(key) == 3:
index_range, y, x = key
indexes = list(range(index_range.start + 1, index_range.stop + 1, index_range.step or 1))
else:
indexes = 1
y, x = key
out_filename = self.parent / f'{self.stem}_y{y.start:09d}_x{x.start:09d}_h{y.stop - y.start:09d}_w{x.stop - x.start:09d}{self.suffix}'
if self.overwrite:
if Path(out_filename).is_file():
Path(out_filename).unlink()
# TODO: write stuff here
with open(out_filename, mode='w') as f:
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
pass
with gw.open('image.tif') as src:
# Transform data here
data = ...
with WriteDaskArray('out.file') as dst:
res = da.store(da.squeeze(data.data),
dst,
lock=False,
compute=False)
res.compute()
from geowombat.
Thanks I will play with that, at the moment I am using parquet files b/c it allows multi-index which is helpful. Here's what I have.
<xarray.DataArray (time: 7, band: 39, y: 5169, x: 6680)>
dask.array<concatenate, shape=(7, 39, 5169, 6680), dtype=float64, chunksize=(1, 38, 799, 799), chunktype=numpy.ndarray>
Coordinates:
* time (time) int64 2010 2011 2012 2013 2014 2015 2016
* y (y) float64 3.403 3.403 3.406 3.406 ... 9.202 9.204 9.207 9.209
* x (x) float64 33.0 33.0 33.0 33.0 33.01 ... 40.49 40.5 40.5 40.5 40.5
* band (band) object 1 2 3 4 5 6 7 8 9 ... 32 33 34 35 36 37 38 'r_code'
Attributes:
transform: (0.002245788210298804, 0.0, 32.99736617392033, 0.0, -0.00...
crs: +init=epsg:4326
res: (0.002245788210298804, 0.002245788210298804)
is_tiled: 1
nodatavals: (nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, n...
scales: (1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1...
offsets: (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0...
AREA_OR_POINT: Area
filename: ['y000002586_x000000000_h000002586_w000003341.tif', 'y000...
resampling: nearest
I have been having some success with the following although with "threads". Not very efficient, but working.
def user_func(*args):
# Gather function arguments
data, window_id, num_workers = list(itertools.chain(*args)) # num_workers=1
# Send the computation to Dask
X = Stackerizer(stack_dims = ('y','x','time'), direction='stack').fit_transform(data) # NOTE stack y before x!!!
X.compute(scheduler='threads', num_workers=num_workers)
with open(out_folder+"/stacked_data_Xy_downcast_parts_window/log.txt", 'a') as the_file:
the_file.write('working on %08d \t' % window_id)
the_file.write( str(datetime.now()) +'\n')
X = X.to_pandas()
X.columns = X.columns.astype(str)
# # downcast to save memory
X = downcast_pandas(X)
# save this approach wont write files in order
X.to_parquet(out_folder+'/stacked_data_Xy_downcast_parts_window/NDVI_2010-2016_Xy_croploss_'+ \
os.path.splitext(block)[0]+'_%05d.parquet' % window_id)
from geowombat.
@mmann1123 Where is Stackerizer
imported from?
from geowombat.
Nevermind, from xr_fresh.transformers import Stackerizer
right?
from geowombat.
The following worked on a small array of random numbers. If it works on your large dataset, it might be good to add a more generic version of chunked writing with an optional user function for transforming data.
A user function that takes a numpy array and returns a pandas DataFrame
def user_func(data):
# The data from `da.store` are numpy arrays
da = xr.DataArray(data=data,
coords={'time': range(1, data.shape[0]+1),
'band': range(1, data.shape[1]+1),
'y': range(1, data.shape[2]+1),
'x': range(1, data.shape[3]+1)},
dims=('time', 'band', 'y', 'x'))
X = Stackerizer(stack_dims=('y','x','time'),
direction='stack')\
.fit_transform(da)
X = X.to_pandas()
X.columns = X.columns.astype(str)
return X
The class to write individual dask chunks to parquet files
class WriteDaskArray(object):
def __init__(self,
filename,
overwrite=False):
self.filename = filename
self.overwrite = overwrite
fpath = Path(self.filename)
self.parent = fpath.parent
self.stem = fpath.stem
self.suffix = fpath.suffix
def __setitem__(self, key, item):
if len(key) == 4:
t, b, y, x = key
elif len(key) == 3:
b, y, x = key
else:
y, x = key
out_filename = self.parent / f'{self.stem}_y{y.start:09d}_x{x.start:09d}_h{y.stop - y.start:09d}_w{x.stop - x.start:09d}{self.suffix}'
if self.overwrite:
if out_filename.is_file():
out_filename.unlink()
item = user_func(item)
item.to_parquet(out_filename)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
pass
Write each chunk to file
ntime = 7
nbands = 40
nrows = 100
ncols = 100
# Keep in mind that `da.store` iterates over each chunk, whereas the geowombat parallel method only iterates over the row/column chunks
chunks = (7, 40, 50, 50)
src = xr.DataArray(data=da.random.random((ntime, nbands, nrows, ncols), chunks=chunks),
coords={'time': range(1, ntime+1),
'band': range(1, nbands+1),
'y': range(1, nrows+1),
'x': range(1, ncols+1)},
dims=('time', 'band', 'y', 'x'))
# This could be used within a client context
with WriteDaskArray('test.parquet') as dst:
res = da.store(src.data,
dst,
lock=False,
compute=False)
res.compute()
from geowombat.
Sorry how are you applying user_func
here to create da
?
from geowombat.
When da.store
is called with a dask.array
passed as the first argument, WriteDaskArray
sends each chunk to a worker. In def __setitem__(self, key, item)
, item
is a numpy array. The numpy array is passed to the user function in item = user_func(item)
, where it is converted to a xarray.DataArray
and transformed.
from geowombat.
Ok this is very interesting your WriteDaskArray is many orders of magnitude faster than the parallel task approach (1 sec per chunk vs 7 minutes. I just figured out how to retain the coords and band names (I think, would appreciate you looking it over to make sure you don't see any stupid mistakes). Note use of chunk to iterate across row chunks.
def user_func(data, src, t, b, y, x):
# The data from `da.store` are numpy arrays
da = xr.DataArray(
data=data,
coords={
"time": src.time.values[t],
"band": src.band.values[b],
"y": src.y.values[y],
"x": src.x.values[x],
},
dims=("time", "band", "y", "x"),
)
X = Stackerizer(stack_dims=("y", "x", "time"), direction="stack").fit_transform(da)
X = X.to_pandas()
X.columns = X.columns.astype(str)
return X
class WriteDaskArray(object):
def __init__(self, filename, src, overwrite=True):
self.filename = filename
self.overwrite = overwrite
fpath = Path(self.filename)
self.parent = fpath.parent
self.stem = fpath.stem
self.suffix = fpath.suffix
self.src = src
def __setitem__(self, key, item):
if len(key) == 4:
t, b, y, x = key
elif len(key) == 3:
b, y, x = key
else:
y, x = key
out_filename = (
self.parent
/ f"{self.stem}_y{y.start:09d}_x{x.start:09d}_h{y.stop - y.start:09d}_w{x.stop - x.start:09d}{self.suffix}"
)
if self.overwrite:
if out_filename.is_file():
out_filename.unlink()
item = user_func(item, self.src, t, b, y, x)
item.to_parquet(out_filename)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
pass
with gw.open(
vrts, time_names=time_names, chunks=400
) as ds:
ds = ds.chunk({"time": -1, "band": -1, "y": "auto", "x": -1}) # rechunk to time
print(ds)
# This could be used within a client context
with WriteDaskArray("/home/mmann1123/Desktop/test.parquet", ds) as dst:
res = da.store(ds.data, dst, lock=False, compute=False)
res.compute()
I also pass the xr array to WriteDaskArray which seems a bit sloppy, is there a better way to do this?
with WriteDaskArray("/home/mmann1123/Desktop/test.parquet", ds) as dst:
I think for my use case, it would be easier to label with a chunk_id similar to window_id from parallel task, do you think this is possible?
from geowombat.
For the chunk id since I have it chunk by row, i guess I could just use the row number from y
from geowombat.
Hi @mmann1123 the code looks pretty good. The speedup isn't too surprising since it is using Dask to manage the task scheduling. The reason I have the other parallel option in geowombat is to provide an interface for large images. As long as the task overhead is manageable, Dask should be faster.
Perhaps you could setup a simple dictionary for the chunk id. Something like:
with WriteDaskArray("/home/mmann1123/Desktop/test.parquet", ds) as dst:
dst.attrs['chunk_ids'] = dict(zip([f'{i:09d}-{j:09d}' for i in range(0, dst.gw.row_chunks) for j in range(0, dst.gw.col_chunks)], range(0, dst.gw.row_chunks*dst.gw.col_chunks)))
And then index dst.chunk_ids
inside WriteDaskArray
like self.src.attrs[f'{y.start:09d}-{x.start:09d}']
.
from geowombat.
I also pass the xr array to WriteDaskArray which seems a bit sloppy, is there a better way to do this?
I don't think so, other than passing the attributes you need individually. I would probably also pass the Xarray.DataArray.gw
object.
from geowombat.
from geowombat.
I just realized that the example above should have been:
with gw.open(vrts, time_names=time_names, chunks=400) as ds:
dst.attrs['chunk_ids'] = dict(zip([f'{i:09d}-{j:09d}' for i in range(0, dst.gw.row_chunks) for j in range(0, dst.gw.col_chunks)], range(0, dst.gw.row_chunks*dst.gw.col_chunks)))
from geowombat.
@mmann1123 I also remembered the chunk_grid
property.
with gw.open('...') as src:
# df is a GeoDataFrame with unique ids for the chunks ('chunk' column)
df = src.gw.chunk_grid
from geowombat.
@mmann1123 Did you use the dask solution? Can we close this issue?
from geowombat.
resolved. Closing issue.
from geowombat.
Related Issues (20)
- Ml no data HOT 2
- gw.open bug?? HOT 5
- Apply example
- Apply on bands error HOT 1
- Apply HOT 2
- Expand pinned python version HOT 1
- Feedstock Issues HOT 1
- Failed to Install Geowombat on pip and conda HOT 12
- Requirement differences between package & conda-forge HOT 6
- Coordinating efforts on potential new geospatial Xarray accessor HOT 2
- Regression capability in geowombat HOT 3
- ARM Installation Issues HOT 9
- Apply interpolation HOT 4
- Stac import HOT 2
- series.apply big tiff / bad block
- EVI equation is wrong HOT 2
- Support for pyproj 3.4.1? HOT 6
- Slow extract HOT 2
- mosaic bounds HOT 4
- fit_predict
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from geowombat.