Git Product home page Git Product logo

Comments (6)

jorisvandenbossche avatar jorisvandenbossche commented on June 17, 2024

Thanks for trying out and the feedback!

In that case, df is a dask dataframe and doesn't have attribute/method like set_geometry, raising a : AttributeError: 'DataFrame' object has no attribute 'set_geometry.

Hmm, yes, it seems that we didn't yet add this set_geometry method to the dask dataframe class. So what GeoPandas does is "monkeypatching" the normal pandas DataFrame to add a set_geometry which then returns a GeoDataFrame (so the above example would work with plain pandas/geopandas). But we didn't do that yet here. We should probably do that for consistency (although monkeypatching is also not the nicest solution).
But in any case, should update the example, as that is not working right now. What you did instead is the correct solution at the moment.

I'm not unable to export it as a geopackage. Am I missing something?

No, that's not yet implemented. Only writing to Parquet files is currently implemented.
I have a POC for reading GIS file formats (https://github.com/jsignell/dask-geopandas/issues/11 / https://nbviewer.jupyter.org/gist/jorisvandenbossche/c94bfc9626e622fda7285ed88a4d771a), but didn't yet try writing files. The main thing I am unsure about (and would need to research / experiment with) is how to let the different partitions write to a single file without concurrency / locking issues (as I suppose you will want a single GeoPackage file and not a directory of GeoPackage files?)

from dask-geopandas.

Guts avatar Guts commented on June 17, 2024

Thanks for your quick reply.

Thanks for your explanation. Until now, I used to chunk huge files (mostly csv) and stream into files (mode="a") or db.
GPKG beeing a SQLite wrapper, if I were to implement a complete workflow, I would try to come up with something like this:

import dask.dataframe as dd

chunked_df = dd.read_csv(
    "really_huge_geocsv.csv", 
    usecols=list(columns_definition.keys()),
    dtype=columns_definition,
)

chunked_geodf = dask_geopandas.from_dask_df(chunked_df)

for chunk in chunked_geodf:
        chunk.to_gpkg(layername='Table',
        if_exists='append',
        geometry_type="point", 
        x=chunked_df.long,
        y=chunked_df.lat,
        crs=32640
)

Still too new to these tools to really help you. Maybe using PyGEOS abilities? There is a small gpkg writer using it: https://github.com/brendan-ward/pgpkg but I still didn't try myself.

from dask-geopandas.

jorisvandenbossche avatar jorisvandenbossche commented on June 17, 2024

Thanks for that link to gpkg! Wasn't aware of that.
Regarding pygeos, I certainly recommend installing it (if installed, geopandas / dask_geopandas will use it automatically under the hood), as it makes things much faster / parallellizable.

I just was experimenting with a to_file for dask_geopandas:

from dask.delayed import delayed, tokenize


@delayed
def _extra_deps(func, *args, extras=None, **kwargs):
    return func(*args, **kwargs)


def to_file(df, path, driver="GPKG", parallel=False, compute=True, **kwargs):
    """
    Write to single file.
    
    Parameters
    ----------
    df : dask_geopandas.GeoDataFrame
    path : str
        Filename.
    parallel : bool, default False
        When true, have each block append itself to the DB table concurrently. This can result in DB rows being in a
        different order than the source DataFrame's corresponding rows. When false, load each block into the SQL DB in
        sequence.
    compute : bool, default True
        When true, call dask.compute and perform the load into SQL; otherwise, return a Dask object (or array of
        per-block objects when parallel=True)
    
    """
    # based on dask.dataframe's to_sql
    def make_meta(meta):
        return meta.to_file(path, driver=driver, mode="w", **kwargs)

    make_meta = delayed(make_meta)
    meta_task = make_meta(df._meta)

    # Partitions should always append to the empty file created from `meta` above
    worker_kwargs = dict(kwargs, driver=driver, mode="a")

    if parallel:
        # Perform the meta insert, then one task that inserts all blocks concurrently:
        result = [
            _extra_deps(
                d.to_file,
                path,
                extras=meta_task,
                **worker_kwargs,
                dask_key_name="to_file-%s" % tokenize(d, **worker_kwargs)
            )
            for d in df.to_delayed()
        ]
    else:
        # Chain the "meta" insert and each block's insert
        result = []
        last = meta_task
        for d in df.to_delayed():
            result.append(
                _extra_deps(
                    d.to_file,
                    path,
                    extras=last,
                    **worker_kwargs,
                    dask_key_name="to_file-%s" % tokenize(d, **worker_kwargs)
                )
            )
            last = result[-1]
    result = delayed(result)

    if compute:
        dask.compute(result, scheduler="processes")
    else:
        return result

And then you can use it like this:

to_file(gdf, "test.gpkg")

from dask-geopandas.

jorisvandenbossche avatar jorisvandenbossche commented on June 17, 2024

Ah, and what I forgot to mention is that you need to change this one line in your fiona install: https://github.com/Toblerity/Fiona/pull/858/files (to enable "append" mode for GPKG), because that fix is not yet released

from dask-geopandas.

Guts avatar Guts commented on June 17, 2024

Thanks for that link to gpkg! Wasn't aware of that.

You're welcome!

Regarding pygeos, I certainly recommend installing it (if installed, geopandas / dask_geopandas will use it automatically under the hood), as it makes things much faster / parallellizable.

Yes it's well documented on geopandas, great tip!

I just was experimenting with a to_file for dask_geopandas:

to_file(gdf, "test.gpkg")

Nice! I'll give it a try if you need.

Ah, and what I forgot to mention is that you need to change this one line in your fiona install: https://github.com/Toblerity/Fiona/pull/858/files (to enable "append" mode for GPKG), because that fix is not yet released

Too bad a new version is not released, it's not always possible to install from Github in professional context.

from dask-geopandas.

mziminski avatar mziminski commented on June 17, 2024
from dask.delayed import delayed, tokenize


@delayed
def _extra_deps(func, *args, extras=None, **kwargs):
    return func(*args, **kwargs)


def to_file(df, path, driver="GPKG", parallel=False, compute=True, **kwargs):  # replaced "GPKG" with "ESRI Shapefile"
    """
    Write to single file.
    
    Parameters
    ----------
    df : dask_geopandas.GeoDataFrame
    path : str
        Filename.
    parallel : bool, default False
        When true, have each block append itself to the DB table concurrently. This can result in DB rows being in a
        different order than the source DataFrame's corresponding rows. When false, load each block into the SQL DB in
        sequence.
    compute : bool, default True
        When true, call dask.compute and perform the load into SQL; otherwise, return a Dask object (or array of
        per-block objects when parallel=True)
    
    """
    # based on dask.dataframe's to_sql
    def make_meta(meta):
        return meta.to_file(path, driver=driver, mode="w", **kwargs)

    make_meta = delayed(make_meta)
    meta_task = make_meta(df._meta)

    # Partitions should always append to the empty file created from `meta` above
    worker_kwargs = dict(kwargs, driver=driver, mode="a")

    if parallel:
        # Perform the meta insert, then one task that inserts all blocks concurrently:
        result = [
            _extra_deps(
                d.to_file,
                path,
                extras=meta_task,
                **worker_kwargs,
                dask_key_name="to_file-%s" % tokenize(d, **worker_kwargs)
            )
            for d in df.to_delayed()
        ]
    else:
        # Chain the "meta" insert and each block's insert
        result = []
        last = meta_task
        for d in df.to_delayed():
            result.append(
                _extra_deps(
                    d.to_file,
                    path,
                    extras=last,
                    **worker_kwargs,
                    dask_key_name="to_file-%s" % tokenize(d, **worker_kwargs)
                )
            )
            last = result[-1]
    result = delayed(result)

    if compute:
        dask.compute(result, scheduler="processes")
    else:
        return result

And then you can use it like this:

to_file(gdf, "test.gpkg")  # replaced .gpkg with .shp

I understand this code is most definitely still experimental, so I tried to modify it slightly to work with ESRI Shapefiles (please reference the comments in the above code), but I got these errors for both the gpkg and shp versions of the above code:

---------------------------------------------------------------------------
BrokenProcessPool                         Traceback (most recent call last)
~/../some_dir/test.py in 
     183 
     184 
---> 185 to_file(gdf, 'test.gpkg')
     186 # to_file(gdf, 'test.shp')

~/../some_dir/test.py in to_file(df, path, driver, parallel, compute, **kwargs)
     178 
     179     if compute:
---> 180         dask.compute(result, scheduler="processes")
     181     else:
     182         return result

~/opt/anaconda3/envs/geo-tools/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    566         postcomputes.append(x.__dask_postcompute__())
    567 
--> 568     results = schedule(dsk, keys, **kwargs)
    569     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    570 

~/opt/anaconda3/envs/geo-tools/lib/python3.7/site-packages/dask/multiprocessing.py in get(dsk, keys, num_workers, func_loads, >func_dumps, optimize_graph, pool, chunksize, **kwargs)
    228             raise_exception=reraise,
    229             chunksize=chunksize,
--> 230             **kwargs
    231         )
    232     finally:

~/opt/anaconda3/envs/geo-tools/lib/python3.7/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, >get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    501             while state["waiting"] or state["ready"] or state["running"]:
    502                 fire_tasks(chunksize)
--> 503                 for key, res_info, failed in queue_get(queue).result():
    504                     if failed:
    505                         exc, tb = loads(res_info)

~/opt/anaconda3/envs/geo-tools/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    426                 raise CancelledError()
    427             elif self._state == FINISHED:
--> 428                 return self.__get_result()
    429 
    430             self._condition.wait(timeout)

~/opt/anaconda3/envs/geo-tools/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

I just started looking into Dask recently. It's still new to me, so I don't really get this error message and I'd like to know if there's an "easy" fix/workaround to make the to_file function work with ESRI shapefiles?

Please let me know.

from dask-geopandas.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.