Git Product home page Git Product logo

Comments (18)

mmann1123 avatar mmann1123 commented on May 25, 2024

from @jgrss

I don't think the issue is with the individual workers, because as you say, the arrays are not that big. From what I read, the problem is with multiprocessing and it's memory management of queue workers. It seems that it might be more of an issue with Python <3.7. Which version are you running?

Possible short term solutions:

Pass keyword arguments to the pool executor so that you can try things like memory mapping or maxtasksperchild.

Try using concurrent.futures instead of multiprocessing by passing scheduler='processes'.

from geowombat.

jgrss avatar jgrss commented on May 25, 2024

@mmann1123 Can you show an example of what your input ds looks like so I can try and replicate it?

from geowombat.

jgrss avatar jgrss commented on May 25, 2024

How about something like this?

from pathlib import Path
import dask.array as da
class WriteDaskArray(object):

    def __init__(self,
                 filename,
                 overwrite=False,
                 **kwargs):

        self.filename = filename
        self.overwrite = overwrite
        self.kwargs = kwargs

        fpath = Path(self.filename)
        self.parent = fpath.parent
        self.stem = fpath.stem
        self.suffix = fpath.suffix

    def __setitem__(self, key, item):

        if len(key) == 3:

            index_range, y, x = key
            indexes = list(range(index_range.start + 1, index_range.stop + 1, index_range.step or 1))

        else:

            indexes = 1
            y, x = key

        out_filename = self.parent / f'{self.stem}_y{y.start:09d}_x{x.start:09d}_h{y.stop - y.start:09d}_w{x.stop - x.start:09d}{self.suffix}'
            
        if self.overwrite:

            if Path(out_filename).is_file():
                Path(out_filename).unlink()
                
        # TODO: write stuff here
        with open(out_filename, mode='w') as f:
            pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        pass
with gw.open('image.tif') as src:
    
    # Transform data here
    data = ...

    with WriteDaskArray('out.file') as dst:

        res = da.store(da.squeeze(data.data),
                       dst,
                       lock=False,
                       compute=False)
        
        res.compute()

from geowombat.

mmann1123 avatar mmann1123 commented on May 25, 2024

Thanks I will play with that, at the moment I am using parquet files b/c it allows multi-index which is helpful. Here's what I have.

<xarray.DataArray (time: 7, band: 39, y: 5169, x: 6680)>
dask.array<concatenate, shape=(7, 39, 5169, 6680), dtype=float64, chunksize=(1, 38, 799, 799), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) int64 2010 2011 2012 2013 2014 2015 2016
  * y        (y) float64 3.403 3.403 3.406 3.406 ... 9.202 9.204 9.207 9.209
  * x        (x) float64 33.0 33.0 33.0 33.0 33.01 ... 40.49 40.5 40.5 40.5 40.5
  * band     (band) object 1 2 3 4 5 6 7 8 9 ... 32 33 34 35 36 37 38 'r_code'
Attributes:
    transform:      (0.002245788210298804, 0.0, 32.99736617392033, 0.0, -0.00...
    crs:            +init=epsg:4326
    res:            (0.002245788210298804, 0.002245788210298804)
    is_tiled:       1
    nodatavals:     (nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, n...
    scales:         (1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1...
    offsets:        (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0...
    AREA_OR_POINT:  Area
    filename:       ['y000002586_x000000000_h000002586_w000003341.tif', 'y000...
    resampling:     nearest

I have been having some success with the following although with "threads". Not very efficient, but working.

def user_func(*args):
 

    # Gather function arguments
    data, window_id, num_workers = list(itertools.chain(*args))    # num_workers=1

    # Send the computation to Dask
    X = Stackerizer(stack_dims = ('y','x','time'), direction='stack').fit_transform(data)   # NOTE stack y before x!!! 
    X.compute(scheduler='threads', num_workers=num_workers) 

    with open(out_folder+"/stacked_data_Xy_downcast_parts_window/log.txt", 'a') as the_file:
        the_file.write('working on %08d \t' % window_id)
        the_file.write( str(datetime.now()) +'\n')

    X = X.to_pandas() 
    X.columns = X.columns.astype(str)

    # # downcast to save memory 
    X = downcast_pandas(X)

    # save this approach wont write files in order 
    X.to_parquet(out_folder+'/stacked_data_Xy_downcast_parts_window/NDVI_2010-2016_Xy_croploss_'+ \
                                os.path.splitext(block)[0]+'_%05d.parquet' %  window_id)

from geowombat.

jgrss avatar jgrss commented on May 25, 2024

@mmann1123 Where is Stackerizer imported from?

from geowombat.

jgrss avatar jgrss commented on May 25, 2024

Nevermind, from xr_fresh.transformers import Stackerizer right?

from geowombat.

jgrss avatar jgrss commented on May 25, 2024

The following worked on a small array of random numbers. If it works on your large dataset, it might be good to add a more generic version of chunked writing with an optional user function for transforming data.

A user function that takes a numpy array and returns a pandas DataFrame

def user_func(data):
    
    # The data from `da.store` are numpy arrays
    da = xr.DataArray(data=data,
             coords={'time': range(1, data.shape[0]+1),
                     'band': range(1, data.shape[1]+1),
                     'y': range(1, data.shape[2]+1),
                     'x': range(1, data.shape[3]+1)},
             dims=('time', 'band', 'y', 'x'))
    
    X = Stackerizer(stack_dims=('y','x','time'), 
                    direction='stack')\
            .fit_transform(da)

    X = X.to_pandas() 
    X.columns = X.columns.astype(str)    
    
    return X

The class to write individual dask chunks to parquet files

class WriteDaskArray(object):

    def __init__(self,
                 filename,
                 overwrite=False):

        self.filename = filename
        self.overwrite = overwrite

        fpath = Path(self.filename)
        self.parent = fpath.parent
        self.stem = fpath.stem
        self.suffix = fpath.suffix

    def __setitem__(self, key, item):

        if len(key) == 4:
            t, b, y, x = key
        elif len(key) == 3:
            b, y, x = key
        else:
            y, x = key

        out_filename = self.parent / f'{self.stem}_y{y.start:09d}_x{x.start:09d}_h{y.stop - y.start:09d}_w{x.stop - x.start:09d}{self.suffix}'
            
        if self.overwrite:

            if out_filename.is_file():
                out_filename.unlink()

        item = user_func(item)

        item.to_parquet(out_filename)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        pass

Write each chunk to file

ntime = 7
nbands = 40
nrows = 100
ncols = 100

# Keep in mind that `da.store` iterates over each chunk, whereas the geowombat parallel method only iterates over the row/column chunks
chunks = (7, 40, 50, 50)

src = xr.DataArray(data=da.random.random((ntime, nbands, nrows, ncols), chunks=chunks),
             coords={'time': range(1, ntime+1),
                     'band': range(1, nbands+1),
                     'y': range(1, nrows+1),
                     'x': range(1, ncols+1)},
             dims=('time', 'band', 'y', 'x'))

# This could be used within a client context
with WriteDaskArray('test.parquet') as dst:

    res = da.store(src.data,
                   dst,
                   lock=False,
                   compute=False)

    res.compute()

from geowombat.

mmann1123 avatar mmann1123 commented on May 25, 2024

Sorry how are you applying user_func here to create da?

from geowombat.

jgrss avatar jgrss commented on May 25, 2024

When da.store is called with a dask.array passed as the first argument, WriteDaskArray sends each chunk to a worker. In def __setitem__(self, key, item), item is a numpy array. The numpy array is passed to the user function in item = user_func(item), where it is converted to a xarray.DataArray and transformed.

from geowombat.

mmann1123 avatar mmann1123 commented on May 25, 2024

Ok this is very interesting your WriteDaskArray is many orders of magnitude faster than the parallel task approach (1 sec per chunk vs 7 minutes. I just figured out how to retain the coords and band names (I think, would appreciate you looking it over to make sure you don't see any stupid mistakes). Note use of chunk to iterate across row chunks.

def user_func(data, src, t, b, y, x):

    # The data from `da.store` are numpy arrays

    da = xr.DataArray(
        data=data,
        coords={
            "time": src.time.values[t],
            "band": src.band.values[b],
            "y": src.y.values[y],
            "x": src.x.values[x],
        },
        dims=("time", "band", "y", "x"),
    )

    X = Stackerizer(stack_dims=("y", "x", "time"), direction="stack").fit_transform(da)

    X = X.to_pandas()
    X.columns = X.columns.astype(str)

    return X


class WriteDaskArray(object):
    def __init__(self, filename, src, overwrite=True):

        self.filename = filename
        self.overwrite = overwrite

        fpath = Path(self.filename)
        self.parent = fpath.parent
        self.stem = fpath.stem
        self.suffix = fpath.suffix

        self.src = src

    def __setitem__(self, key, item):

        if len(key) == 4:
            t, b, y, x = key
        elif len(key) == 3:
            b, y, x = key
        else:
            y, x = key

        out_filename = (
            self.parent
            / f"{self.stem}_y{y.start:09d}_x{x.start:09d}_h{y.stop - y.start:09d}_w{x.stop - x.start:09d}{self.suffix}"
        )

        if self.overwrite:

            if out_filename.is_file():
                out_filename.unlink()

        item = user_func(item, self.src, t, b, y, x)

        item.to_parquet(out_filename)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        pass

 with gw.open(
        vrts, time_names=time_names, chunks=400
    ) as ds:   
 
        ds = ds.chunk({"time": -1, "band": -1, "y": "auto", "x": -1})  # rechunk to time
        print(ds)

# This could be used within a client context
with WriteDaskArray("/home/mmann1123/Desktop/test.parquet", ds) as dst:

    res = da.store(ds.data, dst, lock=False, compute=False)

    res.compute()

I also pass the xr array to WriteDaskArray which seems a bit sloppy, is there a better way to do this?

with WriteDaskArray("/home/mmann1123/Desktop/test.parquet", ds) as dst:

I think for my use case, it would be easier to label with a chunk_id similar to window_id from parallel task, do you think this is possible?

from geowombat.

mmann1123 avatar mmann1123 commented on May 25, 2024

For the chunk id since I have it chunk by row, i guess I could just use the row number from y

from geowombat.

jgrss avatar jgrss commented on May 25, 2024

Hi @mmann1123 the code looks pretty good. The speedup isn't too surprising since it is using Dask to manage the task scheduling. The reason I have the other parallel option in geowombat is to provide an interface for large images. As long as the task overhead is manageable, Dask should be faster.

Perhaps you could setup a simple dictionary for the chunk id. Something like:

with WriteDaskArray("/home/mmann1123/Desktop/test.parquet", ds) as dst:
    dst.attrs['chunk_ids'] = dict(zip([f'{i:09d}-{j:09d}' for i in range(0, dst.gw.row_chunks) for j in range(0, dst.gw.col_chunks)], range(0, dst.gw.row_chunks*dst.gw.col_chunks)))

And then index dst.chunk_ids inside WriteDaskArray like self.src.attrs[f'{y.start:09d}-{x.start:09d}'].

from geowombat.

jgrss avatar jgrss commented on May 25, 2024

I also pass the xr array to WriteDaskArray which seems a bit sloppy, is there a better way to do this?

I don't think so, other than passing the attributes you need individually. I would probably also pass the Xarray.DataArray.gw object.

from geowombat.

mmann1123 avatar mmann1123 commented on May 25, 2024

from geowombat.

jgrss avatar jgrss commented on May 25, 2024

I just realized that the example above should have been:

with gw.open(vrts, time_names=time_names, chunks=400) as ds:
    dst.attrs['chunk_ids'] = dict(zip([f'{i:09d}-{j:09d}' for i in range(0, dst.gw.row_chunks) for j in range(0, dst.gw.col_chunks)], range(0, dst.gw.row_chunks*dst.gw.col_chunks)))

from geowombat.

jgrss avatar jgrss commented on May 25, 2024

@mmann1123 I also remembered the chunk_grid property.

with gw.open('...') as src:
    # df is a GeoDataFrame with unique ids for the chunks ('chunk' column)
    df = src.gw.chunk_grid

from geowombat.

jgrss avatar jgrss commented on May 25, 2024

@mmann1123 Did you use the dask solution? Can we close this issue?

from geowombat.

mmann1123 avatar mmann1123 commented on May 25, 2024

resolved. Closing issue.

from geowombat.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.