geopandas / dask-geopandas Goto Github PK
View Code? Open in Web Editor NEWParallel GeoPandas with Dask
Home Page: https://dask-geopandas.readthedocs.io/
License: BSD 3-Clause "New" or "Revised" License
Parallel GeoPandas with Dask
Home Page: https://dask-geopandas.readthedocs.io/
License: BSD 3-Clause "New" or "Revised" License
What would it look like to parallelize/distributed GeoPandas with Dask?
Dask has created parallel variants of NumPy arrays and Pandas Dataframes. I think it may be sensible to do the same with Geopandas. I would like to solicit thoughts from Geopandas developers and people familiar with Dask on this idea. Is there a significant need for this? How would we arrange the in-memory geopandas components? What operations would it need to support to be pragmatic? What is a minimal feature set necessary to attract early-adopters? What performance issues are we likely to come across?
I am familiar with Dask but my experience with GeoPandas is light.
I suspect that one dask geopandas dataframe (dask-gdf) would be composed of many lazily computed geopandas dataframes (gdfs) with known bounding polygons, stored in another overarching geopandas dataframe (gdf). That is to say that we would partition the full dataset into localized regions, and would centrally maintain a mapping of those regions to pointers to gdfs to help guide computation. Dask-gdf operations would consult this mapping of regions before dispatching appropriate operations to the in-memory gdfs. For example a spatial join with an in-memory gdf would first join against the regions to find which regions have any interaction and then perform many in-memory joins with all of the relevant regions. A spatial join between two dask-gdfs would involve a spatial join on the region-mapping to find all intersections followed by many in-memory spatial joins on the gdfs.
I've convinced myself (perhaps foolishly) that this is sensible for spatial join operations and possible to accomplish lazily (which is important for larger-than-memory use). However I don't know if this metadata is sufficient to implement most other relevant operations for GeoPandas.
For background, here is how we design dask.arrays and dask.dataframes:
Almost all dask.array/dataframe operations are closed under this metadata (they know everything they need to do with this metadata and can generate the metadata of the output) without looking at the underlying values.
Often when parallelizing in-memory libraries we start becoming concerned with things like releasing the GIL and fast serialization. Are these an issue for GeoPandas?
An initial naive test shows that yes, serialization is an issue. We're only able to serialize GeoPandas dataframes at around 2MB/s (although this example may not be representative).
In [1]: import geopandas as gpd
In [2]: cities = gpd.read_file(gpd.datasets.get_path('naturalearth_cities'))
In [3]: import pickle
In [4]: %time len(pickle.dumps(cities))
CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 5.59 ms
Out[4]: 11583
In [5]: 11583 / 0.0059 / 1e6 # MB/s
Out[5]: 1.9632203389830507
I haven't tested GIL-releasing friendliness yet.
cc @kjordahl @jorisvandenbossche @r-shekhar @kuanb @jalessio
This recent change dask/dask#7586 removed make_meta
from dataframe.core
resulting in breakage of our implementation - https://github.com/geopandas/dask-geopandas/runs/2678080728?check_suite_focus=true
I've asked in that PR what is the best way to get around it but I also wanted to flag it here.
Just locally on my laptop, but I can't get a distributed client to work, while the default dask threaded scheduler works, Γ‘nd a plain dask.dataframe also works with distributed.
What I am doing to test is:
import geopandas
import dask.dataframe as dd
import dask_geopandas
from distributed import Client
client = Client()
client
# loading a geopandas GeoDataFrame
df = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))
# first creating a pure dask DataFrame (getting non-geometry columns)
ddf = dd.from_pandas(df[['pop_est', 'continent', 'name']], npartitions=4)
# this works fine
ddf.continent.value_counts().compute()
# now creating a dask_geopandas.GeoDataFrame
ddf = dask_geopandas.from_geopandas(df, npartitions=4)
# the same thing now fails (only getting the column as a dask.Series from the GeoDataFrame)
ddf.continent.value_counts().compute()
and this fails with:
---------------------------------------------------------------------------
KilledWorker Traceback (most recent call last)
<ipython-input-32-f1c611ca664d> in <module>
----> 1 ddf.continent.value_counts().compute()
~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
163 dask.base.compute
164 """
--> 165 (result,) = compute(self, traverse=False, **kwargs)
166 return result
167
~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
434 keys = [x.__dask_keys__() for x in collections]
435 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 436 results = schedule(dsk, keys, **kwargs)
437 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
438
~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2593 should_rejoin = False
2594 try:
-> 2595 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2596 finally:
2597 for f in futures.values():
~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1885 else:
1886 local_worker = None
-> 1887 return self.sync(
1888 self._gather,
1889 futures,
~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
777 return future
778 else:
--> 779 return sync(
780 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
781 )
~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
346 if error[0]:
347 typ, exc, tb = error[0]
--> 348 raise exc.with_traceback(tb)
349 else:
350 return result[0]
~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/distributed/utils.py in f()
330 if callback_timeout is not None:
331 future = asyncio.wait_for(future, callback_timeout)
--> 332 result[0] = yield future
333 except Exception as exc:
334 error[0] = sys.exc_info()
~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
~/miniconda3/envs/geo-dev/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1750 exc = CancelledError(key)
1751 else:
-> 1752 raise exception.with_traceback(traceback)
1753 raise exc
1754 if errors == "skip":
KilledWorker: ("('from_pandas-35d27145a3c0b4d3be6e6daa6537420f', 0)", <Worker 'tcp://127.0.0.1:46111', name: 2, memory: 0, processing: 4>)
Any ideas on where to look to debug this?
We already preserve the spatial partitioning information (spatial_partitions
attribute) in several places (eg when selecting a subset of the columns in __getitem__
, in the boundary
attribute, with a _propagate_spatial_partitions
helper method).
But there are more places where it could either be preserved as is, or preserved in a slightly modified form.
Methods where it can be preserved as is:
Methods where it might be relatively straightforward to preserve it in a slightly modified form:
Using the dask set_index
method results in an "invalid" dask_geopandas.GeoDataFrame, where the partitions are no longer GeoDataFrames but DataFrames (which then results in errors when computing spatial operations)
import geopandas
import dask_geopandas
gdf = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))
ddf = dask_geopandas.from_geopandas(gdf, npartitions=4)
>>> type(ddf.partitions[0].compute())
geopandas.geodataframe.GeoDataFrame
>>> ddf2 = ddf.set_index("continent")
# still a dask_geopandas GeoDataFrame
>>> type(ddf2)
dask_geopandas.core.GeoDataFrame
# but partitions under the hood no longer a geopandas.GeoDataFrame
>>> type(ddf2.partitions[0].compute())
pandas.core.frame.DataFrame
# which results in errors when doing actual computations
>>> ddf2.area.compute()
....
AttributeError: 'DataFrame' object has no attribute 'area'
(Mis)using our issue tracker here for a moment, but I thought this was the easiest way to notify some people who have been trying out dask-geopandas / reporting issues / doing fixes, as this could potentially be interesting for you: @RichardScottOZ, @gansanay, @bilelomrani1, @njanakiev, @Rhiyo, @daviddemeij, @Guts, @deeplook
During the Dask Summit, we have a 2-hour workshop scheduled about scaling geospatial vector data on Thursday May 20th at 11-13:00 UTC (https://summit.dask.org/schedule/presentation/22/scaling-geospatial-vector-data/). All welcome to join!
See also geopandas/community#4 for more details.
I think that we should expand CI to test on windows and mac and to include master versions of key packages. Since testing on windows/mac is not supported on Travis (maybe mac is if I remember?), I'd suggest to follow the geopandas model and switch to GitHub actions.
Happy to do a PR.
If you want to measure anything which has to do with the spatial relationship of features, you eventually find an issue of needing to go across chunk boundaries (assuming spatially chunked ddf). A typical example is a spatial lag, i.e. the mean of values on neighbouring (touching) features.
The way raster analysis deals with it is using dask.array.map_overlap
. That essentially copies a bit of neighbouring data to each chunk to create overlaps so you can do your spatial lag fully within a chunk. See https://docs.dask.org/en/latest/array-overlap.html
I believe that we need the analogy of map_overlap
for vector data. It is naturally a significantly more complex issue since we do not know how the data look like (it is not a grid). But I believe it is doable and could be a massive game-changer. For example, I would be able to base 90% of momepy on dask-geopandas.
The trick is to define which features should be overlapping. For that, you need to know how far you have to go for each particular operation but we can specify:
I have actually already tested this approach with topological threshold, with custom single-core functions and it works well.
We obviously first need spatial re-chunking and spatial indexing, but this is something I'd like to put on a roadmap (maybe for GSoC?).
dask.array implementation - https://docs.dask.org/en/latest/array-overlap.html?highlight=map_overlap#dask.array.map_overlap
dask.dataframe implementation - https://docs.dask.org/en/latest/dataframe-api.html?highlight=map_overlap#dask.dataframe.DataFrame.map_overlap
In certain situations, the dask_geopandas.points_from_xy
only gets applied to the first partition. I am not sure when this happens exactly as I also had cases when this error didn't seem to occur. But below I have a minimal example that causes this problem.
import pandas as pd
import dask_geopandas
import dask.dataframe as dd
import numpy as np
N = 9
df = pd.DataFrame({'x': np.random.randn(N), 'y': np.random.randn(N)})
ddf = dd.from_pandas(df, npartitions=3)
ddf["geometry"] = dask_geopandas.points_from_xy(ddf, "x", "y")
ddf = dask_geopandas.from_dask_dataframe(ddf)
ddf.compute()
In principle, we know the shape of the returned array in total_bounds
. It'd be nice if we could specify that to dask (now you get a shape of (nan, )
.
Copied from: https://github.com/jsignell/dask-geopandas/pull/1/files/2953fe474021e53a364a58bc6f660c462a2d3c98#r394208103
Currently, using set_crs
doesn't preserve the spatial partitions (similar as copy
, see #56):
import geopandas
import dask_geopandas
gdf = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))
# remove the crs for purpose of the reproducible example
gdf.crs = None
# create partitioned dataframe + calculate spatial partitions
ddf = dask_geopandas.from_geopandas(gdf, npartitions=4)
ddf.calculate_spatial_partitions()
>>> ddf.spatial_partitions
0 POLYGON ((-68.149 -55.612, -68.640 -55.580, -6...
1 POLYGON ((18.465 -29.045, 16.345 -28.577, -77....
2 POLYGON ((166.740 -22.400, -8.899 36.869, -9.5...
3 POLYGON ((-180.000 -90.000, -180.000 -84.713, ...
dtype: geometry
>>> ddf = ddf.set_crs("EPSG:4326")
>>> ddf.spatial_partitions is None
True
@tastatham has an implementation of Morton distance mirroring the one we have for Hilbert distance (#70).
The idea is to push it and have it as another repartitioning method. It provides a bit worse result but can be significantly faster in some cases (@tastatham will also do the performance comparison).
I've encountered behaviour which makes sense, but it is a bit confusing. Consider the following example (mirroring my real data).
df = geopandas.read_file(geopandas.datasets.get_path('naturalearth_lowres'))
df.iloc[:50].to_parquet('test/test_1.pq')
df.iloc[50:100].to_parquet('test/test_2.pq')
df.iloc[100:].to_parquet('test/test_3.pq')
ddf = dask_geopandas.read_parquet("test/test_*.pq")
ddf['pop_est'].max().compute()
This raises ValueError below because dask is not reading geometry column. And in that case, geopandas complains and tells you that you should use pandas.read_parquet
. I guess, that this is again something to be fixed in geopandas, but it does not happen using geopandas only and if it does, the error is meaningful. Which is not the case for dask_geopandas.
The workaround is to read parquet using dask.dataframe, but dask_geopandas should be able to resolve this under the hood.
import dask.dataframe as dd
ddf = dd.read_parquet("test/test_*.pq")
The error from dask_geopandas:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-59-35ac7a0f8ffc> in <module>
----> 1 ddf['pop_est'].max().compute()
/opt/conda/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
165 dask.base.compute
166 """
--> 167 (result,) = compute(self, traverse=False, **kwargs)
168 return result
169
/opt/conda/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
445 postcomputes.append(x.__dask_postcompute__())
446
--> 447 results = schedule(dsk, keys, **kwargs)
448 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
449
/opt/conda/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2686 should_rejoin = False
2687 try:
-> 2688 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2689 finally:
2690 for f in futures.values():
/opt/conda/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1986 direct=direct,
1987 local_worker=local_worker,
-> 1988 asynchronous=asynchronous,
1989 )
1990
/opt/conda/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
831 else:
832 return sync(
--> 833 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
834 )
835
/opt/conda/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
337 if error[0]:
338 typ, exc, tb = error[0]
--> 339 raise exc.with_traceback(tb)
340 else:
341 return result[0]
/opt/conda/lib/python3.7/site-packages/distributed/utils.py in f()
321 if callback_timeout is not None:
322 future = asyncio.wait_for(future, callback_timeout)
--> 323 result[0] = yield future
324 except Exception as exc:
325 error[0] = sys.exc_info()
/opt/conda/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
/opt/conda/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1845 exc = CancelledError(key)
1846 else:
-> 1847 raise exception.with_traceback(traceback)
1848 raise exc
1849 if errors == "skip":
/opt/conda/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet_part()
271 This function is used by `read_parquet`."""
272 if isinstance(part, list):
--> 273 dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
274 df = concat(dfs, axis=0)
275 else:
/opt/conda/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in <listcomp>()
271 This function is used by `read_parquet`."""
272 if isinstance(part, list):
--> 273 dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
274 df = concat(dfs, axis=0)
275 else:
/opt/conda/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py in read_partition()
579
580 arrow_table = cls._parquet_piece_as_arrow(piece, columns, partitions, **kwargs)
--> 581 df = cls._arrow_table_to_pandas(arrow_table, categories, **kwargs)
582
583 # Note that `to_pandas(ignore_metadata=False)` means
/opt/conda/lib/python3.7/site-packages/dask_geopandas/io/parquet.py in _arrow_table_to_pandas()
40
41 # TODO support additional keywords
---> 42 return _arrow_to_geopandas(arrow_table)
43
44 @classmethod
/opt/conda/lib/python3.7/site-packages/geopandas/io/arrow.py in _arrow_to_geopandas()
338 """No geometry columns are included in the columns read from
339 the Parquet/Feather file. To read this file without geometry columns,
--> 340 use pandas.read_parquet/read_feather() instead."""
341 )
342
ValueError: No geometry columns are included in the columns read from
the Parquet/Feather file. To read this file without geometry columns,
use pandas.read_parquet/read_feather() instead.
For making spatial joins or overlays, spatial predicates, reading from spatially partitioned datasets, etc more efficient, we can have spatially partitioned dataframes: the bounds of each partition is known, and thus it can be checked based on those bounds whether on operation needs to involve that partition or not.
And then geodataframes can also be re-partitioned to optimize the bounds (minimize the overlap) as much as possible (initial costly shuffle operation, but can pay-off later).
This complicates the implementation (we need to keep track of the spatial partitioning, the partitions can change during spatial operations, ..), but I think it will also be critical for improving performance on large datasets.
How can we add this?
In the previous iteration at https://github.com/mrocklin/dask-geopandas, the dataframes had an additional _regions
attribute, which was a geopandas.GeoSeries with the "regions" of each partition (so len(regions) == npartitions
).
I think one advantage of using a GeoSeries is that this makes it easy to work with (eg it is easy to check which partitions would intersect with a given geometry).
In spatialpandas
(https://github.com/holoviz/spatialpandas), there is a combo of partition_bounds
and partition_sindex
.
The partition_bounds
is basically the total_bounds
of each partition (so you could see it as the _regions
but limited to a rectangular box and stores as the 4 (minx, miny, maxx, maxy) numbers). And then partition_sindex
is a spatial index built on the partition_bounds
.
See https://github.com/holoviz/spatialpandas/blob/master/spatialpandas/dask.py
I suppose starting with a basic "partition bounds" should be fine, and allows to later expand it with a spatial index or with more fine-grained shapes.
#54 added a basic spatial join function. It's a naive cross product of all partitions of both left and right (if no spatial partitioning information is available) or all partition combinations that intersect (based on the spatial partitioning information).
This works fine for an inner join and when your spatial partitions are already reasonable. There are however several aspects still to consider.
Supporting a left join
So currently we only allow how="inner"
, as for this case the naive version works nicely. However, a "left" join becomes more complicated.
Suppose we have a certain partition of the left frame that intersects with two partitions of the right frame. We perform a normal spatial join (using geopandas.sjoin
) for each of those two combinations, resulting in two output partitions. But when doing a left join, geopandas.sjoin
preserves the rows of the left dataframe that don't match with a geometry of the right dataframe. But so if we do this multiple times, it means that rows of the left dataframe can end up in multiple output partitions (instead of only in one of the output partitions, as is the case for an inner join). As a result, we would end up with a dask GeoDataFrame with duplicated rows.
Improving the output (spatial) partitions
Because the current implementation combines each partition of the left frame with each (intersecting) partition of the right dataframe, the result inherently has more and smaller partitions in the output.
Suppose again we have a certain partition of the left frame that intersects with two partitions of the right frame. Each of those two combinations is currently a task in the graph that ends up as a new partition in the output dataframe.
Overall (and depending on the exact (spatial) partitioning of the input), this can lead to a "fragmented" output dask GeoDataFrame with many smaller partitions, which could negatively impact (the performance of) further computations.
It might be interesting to look into "merging" chunks again (eg all output partitions coming from a certain partition of the left input could be combined (concatted) into a single output partition).
Repartition the input before performing a spatial join?
For pre-partitioned left and right datasets, the currently implemented approach (joining of geometries by partition resulting from intersection of partitions) is probably fine. But there will also be cases where it could be interesting to repartition input before performing the join.
One example case (copying from @brendan-ward mail conversation): for a target dataset (assume this has the greater number of features) that has already been partitioned, partition the query dataset to match the target (which, I guess, is the same as above, just that query dataset is a partition size of 1) - which may involve cutting the query dataset by the bounding boxes of the intersecting partitions.
Adding read_parquet
/read_feather
and to_parquet
/to_feather
functions, so we can work from large data from disk (and not just from existing geopandas.GeoDataFrames).
An initial version could be a relatively simple wrapper of the dask + geopandas functionalities, I think.
(and later we should look into spatial partitioned datasets, but that first requires spatial partitioned dataframes)
For set_crs
, we probably want to check that the crs
is None (not already set), and in that case raise a warning or error (maybe with a keyword to force it).
Otherwise I think people will accidentally call this assuming their geometries get transformed.
(we should also align with a similar method in GeoPandas)
Copied from: https://github.com/jsignell/dask-geopandas/pull/1/files/2953fe474021e53a364a58bc6f660c462a2d3c98#r394216055
Once there is a spatial partitioning backend in place, we could read chunks from PostGIS via SQL (ST_Intersects
or something).
We may also be able to link it to chunksize
parameter and read into partitions directly (I think that adapting dask.dataframe.read_sql_table
to handle geometries would do it), but spatially constrained reading would be better.
Running dask-geopandas with geopandas master raises the following warning: /opt/conda/lib/python3.7/site-packages/dask_geopandas/backends.py:54: FutureWarning: Assigning CRS to a GeoDataFrame without a geometry column is now deprecated and will not be supported in the future.
It does that on every step.
ddf = dask_geopandas.from_geopandas(df, npartitions=14)
ddf['area'] = ddf.buildings.area
In GeoPandas, we have moved CRS attribute to GeometryArray level. Since we do not really want to have GeoDataFrame without a geometry but with CRS, we aim to deprecate that option. I guess than in here it is better to also store the copy on dask.GeoDataFrame level, but we have to figure out if it is better to keep the existing support in geopandas or change the implementation here.
Currently the spatial_partitions
is a plain attribute, meaning you can assign anything to it etc. We should probably turn it into a property, so we can control the actual getter/setter, and store the value itself in _spatial_partitions
. Something like:
@property
def spatial_partitions(self):
return self._spatial_partitions
@spatial_partitions.setter
def spatial_partitions(self, value):
# ...
# ensure value is a GeoSeries, has the correct crs, has a default index, ...
self._spatial_partitions = value
This would also easily allow to store the actual underlying value as something else as a GeoSeries (eg only bounding boxes, if we want that at some point), while keeping the public property returning a GeoSeries for convenience.
Assuming you have an attribute in the GeoDataFrame that describes some form of spatial extent or entity (eg the country name, the zip code, district, ..., or a calculation such as a geohash), this column could also be used to repartition the dataframe spatially.
Dask already somewhat supports this with the set_index
and repartition
methods. But in practice they don't necessarily do exactly what we want, and a helper function that uses those methods on the hood will probably be useful.
In my sjoin demo notebook, I repartitioned a GeoDataFrame based on a geohash column, i.e. a column with unique values and I wanted a new partition for each value in this column.
In ended up doing something like:
df_points_sorted = df_points.set_index("geohash1").sort_index()
n_partitions = df_points.index.nunique()
ddf_points_sorted = dask_geopandas.from_geopandas(df_points_sorted, npartitions=n_partitions)
ddf_points_sorted = ddf_points_sorted.repartition(
divisions=('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'b', 'c', 'd', 'e', 'f', 'g', 'g')).persist()
So dask sorts the input on the index and determines partition divisions on that in from_(geo)pandas
(or if you already start with a dask dataframe, set_index
will do the same). However, that tries to create more or less equal size partitions, so to ensure to actually have one partition per unique value of the attribute, I repartitioned the result again with the partition bounds explicitly given to repartition(..)
(this are the unique values in the "geohash1" column). This second step is relatively cheap since it's only changing some start/stop slices, and not actually shuffling data.
I suppose we could write a custom version (maybe based on how those methods in dask are implemented) that makes this a bit simpler (e.g. we also don't need to have the partitions sorted).
I think the main question is: do we need two passes over the data to first determine all unique values in the column, and then the actual repartitioning in a second step?
I may be doing something wrong, but it seems that there's no way to apply a function to GeoSeries. In the following example, I am just using lambda and area, but the situation is the same no matter which function you apply.
df = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))
ddf = dask_ geopandas.from_geopandas(df, npartitions=4)
ddf.geometry.apply(lambda geom: geom.area, meta='float').compute()
It always ends with TypeError: apply() takes from 2 to 3 positional arguments but 4 were given
.
Full traceback:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-83-5841353d9afc> in <module>
----> 1 ddf.geometry.apply(lambda geom: geom.area, meta='float').compute()
/opt/conda/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
165 dask.base.compute
166 """
--> 167 (result,) = compute(self, traverse=False, **kwargs)
168 return result
169
/opt/conda/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
445 postcomputes.append(x.__dask_postcompute__())
446
--> 447 results = schedule(dsk, keys, **kwargs)
448 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
449
/opt/conda/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2686 should_rejoin = False
2687 try:
-> 2688 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2689 finally:
2690 for f in futures.values():
/opt/conda/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1986 direct=direct,
1987 local_worker=local_worker,
-> 1988 asynchronous=asynchronous,
1989 )
1990
/opt/conda/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
831 else:
832 return sync(
--> 833 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
834 )
835
/opt/conda/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
337 if error[0]:
338 typ, exc, tb = error[0]
--> 339 raise exc.with_traceback(tb)
340 else:
341 return result[0]
/opt/conda/lib/python3.7/site-packages/distributed/utils.py in f()
321 if callback_timeout is not None:
322 future = asyncio.wait_for(future, callback_timeout)
--> 323 result[0] = yield future
324 except Exception as exc:
325 error[0] = sys.exc_info()
/opt/conda/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
/opt/conda/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1845 exc = CancelledError(key)
1846 else:
-> 1847 raise exception.with_traceback(traceback)
1848 raise exc
1849 if errors == "skip":
/opt/conda/lib/python3.7/site-packages/dask/optimization.py in __call__()
1020 if not len(args) == len(self.inkeys):
1021 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
-> 1022 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
1023
1024 def __reduce__(self):
/opt/conda/lib/python3.7/site-packages/dask/core.py in get()
149 for key in toposort(dsk):
150 task = dsk[key]
--> 151 result = _execute_task(task, cache)
152 cache[key] = result
153 result = _execute_task(out, cache)
/opt/conda/lib/python3.7/site-packages/dask/core.py in _execute_task()
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
/opt/conda/lib/python3.7/site-packages/dask/utils.py in apply()
29 def apply(func, args, kwargs=None):
30 if kwargs:
---> 31 return func(*args, **kwargs)
32 else:
33 return func(*args)
/opt/conda/lib/python3.7/site-packages/dask/dataframe/core.py in apply_and_enforce()
5256 func = kwargs.pop("_func")
5257 meta = kwargs.pop("_meta")
-> 5258 df = func(*args, **kwargs)
5259 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
5260 if not len(df):
/opt/conda/lib/python3.7/site-packages/dask/utils.py in __call__()
893
894 def __call__(self, obj, *args, **kwargs):
--> 895 return getattr(obj, self.method)(*args, **kwargs)
896
897 def __reduce__(self):
TypeError: apply() takes from 2 to 3 positional arguments but 4 were given
The hilbert_distance
function will shortly be merged into Dask-GeoPandas.
There was several enhancements that were discussed as follow up items, which include:
total_bounds
argument to hilbert_distance
function if this is already pre-computed: #145spatial_partitions
if present. -> #161_continuous_to_discrete_coords
is not necessary:
_continuous_to_discrete
function clips discrete integer values using base Python because numpy.clip
is not currently supported by Numba:numpy.clip
when it's supported by Numba - v0.54_continuous_to_discrete
- to use numpy.int32 instead of numpy.int64p
parameter that controls precision of resulting Hilbert curve; need to help user understand how to set this themselves / when this can be useful to set this themselves. (also check hackmd notes)Note: Will add permalinks when merged.
For the spatial predicates (intersects
, contains
, within
, etc), at the moment we simply perform the geopandas operation for each partition, using dask's elemwise
as helper method (which eg will ensure that partitions are aligned if the argument is another dask dataframe).
If spatial partitioning information is available, this can be optimized if we know that some partition is not intersecting at all with the passed geometry, we can avoid doing the computation (as we known it will be all False
).
So at least for the simple case (and most prevalent, I think), where a single geometry is passed to the predicate method, such as:
gdf.intersects(polygon)
In such a case, we can first check whether that polygon intersects with gdf.spatial_partitions
, and then only for the intersecting partitions perform the actual intersects(..)
predicate operation, and for the others directly create a Series of False
values.
Not sure if this is expected, but the homepage listed in setup.py
( https://geopandas.dask.org/ ) doesn't seem to exist
Line 32 in 610dd69
From #28 (review). That PR added the ability to load the spatial_partitions
from the Parquet metadata in read_parquet
. But currently that only loads bounding boxes (as that is what is stored in the metadata), and thus cannot roundtrip the exact (potentially more detailed) geometries in the spatial partitioning information.
We currently fully rely on the metadata saved in the Parquet file as done by GeoPandas / defined in https://github.com/geopandas/geo-arrow-spec, and this currently only has a bbox (and not generic extent). So we could start a dicussion there to expand this, or on the short term add some custom metadata for dask_geopandas in the to_parquet
(this should be easy to do, but of course custom to dask-geopandas).
This also depends on the discussion in #8 on how we want to store partitions.
I needed geopandas.clip
functionality the other day and I utilized map_partitions
to that end successfully.
Reading #54 I learned how one might be able to perform clip
in a "smarter" way, so I gave it a try.
import numpy as np
import pandas as pd
import geopandas as gpd
from dask.highlevelgraph import HighLevelGraph
import dask_geopandas
# Create data
df = gpd.read_file(gpd.datasets.get_path("naturalearth_lowres"))
ddf = dask_geopandas.from_geopandas(pd.concat([df]*5), npartitions=40)
mask = df.loc[df['iso_a3'].eq('CZE')]
mask['geometry'] = mask.geometry.buffer(1)
# Create more localized spatial partitions
ddf = ddf.set_crs('epsg:4326').set_index('continent').reset_index().persist()
ddf.calculate_spatial_partitions()
ddf.spatial_partitions.set_crs('epsg:4326', inplace=True)
# ddf.spatial_partitions.plot(color='none', edgecolor='black')
# Naive version (~2.75s on my machine)
result_naive = ddf.map_partitions(lambda x: gpd.clip(x, mask)).compute()
# Smarter version (~1.37s on my machine)
new_spatial_partitions = gpd.clip(ddf.spatial_partitions.to_frame('geometry'), mask)
intersecting_partitions = np.asarray(new_spatial_partitions.index)
name = 'clip-test'
dsk = {(name, i): (gpd.clip, (ddf._name, l), mask) for i, l in enumerate(intersecting_partitions)}
divisions = [None] * (len(dsk) + 1)
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[ddf])
result = dask_geopandas.core.GeoDataFrame(graph, name, ddf._meta, tuple(divisions))
result.spatial_partitions = new_spatial_partitions
result_smart = result.compute()
pd.testing.assert_frame_equal(result_naive, result_smart)
Does this look like a reasonable approach? If so, then I'm happy to test & PR.
Not all, but many of the spatial file formats supported by GDAL/fiona allow to efficiently read chunks of the file (eg Shapefile, GeoPackage).
We could have a dask_geopandas.read_file
variant of geopandas.read_file
that constructs a partitioned dataframe from such files.
A small prototype:
import geopandas
import dask_geopandas
from dask.delayed import delayed
import fiona
def read_file(path, npartitions):
with fiona.open(path) as collection:
total_size = collection.session.get_length()
# TODO smart inference for a good default partition size
batch_size = (total_size // npartitions) + 1
row_offset = 0
dfs = []
while row_offset < total_size:
rows = slice(row_offset, min(row_offset + batch_size, total_size))
df = delayed(geopandas.read_file)(path, rows=rows)
dfs.append(df)
row_offset += batch_size
# TODO this could be inferred from fiona's collection.meta["schema"]
meta = geopandas.read_file(path, rows=5)
return dd.from_delayed(dfs, meta)
And a small experiment with it: https://nbviewer.jupyter.org/gist/jorisvandenbossche/c94bfc9626e622fda7285ed88a4d771a
Since reading those files (currently) doesn't release the gil much, this is mostly useful in a multi-processing setting (and not the default threading scheduler).
I think a function like the above would already be interesting anyway, but for further optimization, using pyogrio
might be an interesting path: geopandas/pyogrio#1
I would expect the spatial_partitions
to be copied when using df.copy()
.
import geopandas
import dask
import dask_geopandas
dask.config.set(scheduler='single-threaded')
df = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))
ddf = dask_geopandas.from_geopandas(df, npartitions=4)
ddf.calculate_spatial_partitions()
ddfc = ddf.copy()
ddfc.spatial_partitions # is None
Is there a reason to not copy spatial partitions that I didn't think of?
I thought about how this could work and didn't think of anything better than overriding dask's copy
method. Something like
def copy(self):
self_copy = super().copy()
self_copy.spatial_partitions = self.spatial_partitions
return self_copy
It would be great if spatial_partitions
could get copied automatically, as crs
for example does. However, crs
is actually a property of _meta
, which does get automatically copied by dask. Not sure how spatial partitions could be attached to the _meta
, or if there's another mechanism that could act similarly.
If I'm headed in the right direction, then I'm happy to unit test & PR.
I have a list of GeoPackages, one for an urban area, I need to read to dask.GeoDataFrame. Since they are already essentially spatially partitioned, the optimal way would be to read each as a chunk directly. Now I have to read them one by one via GeoPandas, concatenate and then create dask.GeoDataFrame from geopandas.GeoDataFrame, which loses spatial partitions.
For cases like this, it may be useful to have dask_geopandas.read_files(list)
function which would call geopandas.read_file
for each chunk and create chunked GeoDataFrame directly. It would be helpful to be able to pass both list
and a path to a folder (like we do with parquet) since in the list you can specify a path in the zip for example (my case).
This is the existing code I am using:
paths = ["foo/bar/one.zip!data/file.gpkg", "foo/bar/two.zip!data/file.gpkg"]
gdfs = []
for file in paths:
gdf = gpd.read_file(file)
gdfs.append(gdf)
gdf = pd.concat(gdfs)
ddf = dask_geopandas.from_geopandas(gdf, npartitions=2) # non spatial chunks
And this would be optimal:
paths = ["foo/bar/one.zip!data/file.gpkg", "foo/bar/two.zip!data/file.gpkg"]
ddf = dask_geopandas.read_files(paths) # one chunk per file
As a spatial join workaround, I wanted to use map_partition
and run the spatial join (geopandas.tools.sjoin
) on each partition and aggregate later down the line. Each partition produces an invalid GeoDataFrame and the geometry became a LINESTRING
.
Versions used (using conda-forge
):
dask 2021.04.0
pandas 1.2.4
geopandas 0.9.0
dask_geopandas v0.1.0a2+1.g69a4073
pygeos 0.9
Here is the reproducible in a minimal example:
import dask.dataframe as dd
import dask_geopandas
ddf = dd.read_csv("data/random_points.csv")
ddf = dask_geopandas.from_dask_dataframe(ddf)
ddf = ddf.set_geometry(
dask_geopandas.points_from_xy(ddf, 'lon', 'lat')
)
def spatial_join(gdf):
print(gdf.head())
# spatial join omitted for clarity
return gdf
ddf.map_partitions(spatial_join)
Which prints the following output:
lon lat label geometry
0 1.0 1.0 foo LINESTRING (0.00000 0.00000, 0.00000 1.00000)
1 1.0 1.0 foo LINESTRING (1.00000 1.00000, 1.00000 2.00000)
Data was generated like this:
import numpy as np
import pandas as pd
import string
n = 100000
X = np.random.random((n, 2))
X[:, 0] = 360 * X[:, 0] - 180
X[:, 1] = 170 * X[:, 1] - 85
df = pd.DataFrame(X, columns=['lon', 'lat'])
df['label'] = np.random.choice(list(string.ascii_letters), n)
df.to_csv("data/random_points.csv", index=False)
while attempting to import dask_geopandas i encountered error messages as following:
import dask_geopandas Traceback (most recent call last): File "", line 1, in File "C:\Users\user\Miniconda3\envs\envname\lib\site-packages\dask_geopandas_init_.py", line 3, in from . import backends File "C:\Users\user\Miniconda3\envs\envname\lib\site-packages\dask_geopandas\backends.py", line 4, in from dask.dataframe.utils import ( ImportError: cannot import name '_nonempty_index' from 'dask.dataframe.utils' (C:\Users\user\Miniconda3\envs\envname\lib\site-packages\dask\dataframe\utils.py)
apparently this name has moved from dask.dataframe.utils to dask.dataframe.backends. The initial underscore suggests it was not meant for use by other packages.
thank you
I have multiple CSV files opened with dask
as is:
import dask.dataframe as dd
import dask_geopandas
df = dd.read_csv('csv/*_timeseries.csv')
gdf = dask_geopandas.from_dask_dataframe(df)
gdf = gdf.set_geometry(
dask_geopandas.points_from_xy(gdf, x='Longitude', y='Latitude')
).set_crs('epsg:4326').to_crs('epsg:3395')
When invoking gdf.compute()
, the following exception is raised:
------------------------------------------------------------------------
ProjError Traceback (most recent call last)
<ipython-input-251-f87bb3768545> in <module>
----> 1 gdf.compute()
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
165 dask.base.compute
166 """
--> 167 (result,) = compute(self, traverse=False, **kwargs)
168 return result
169
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
450 postcomputes.append(x.__dask_postcompute__())
451
--> 452 results = schedule(dsk, keys, **kwargs)
453 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
454
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2723 should_rejoin = False
2724 try:
-> 2725 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2726 finally:
2727 for f in futures.values():
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1984 else:
1985 local_worker = None
-> 1986 return self.sync(
1987 self._gather,
1988 futures,
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
830 return future
831 else:
--> 832 return sync(
833 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
834 )
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
338 if error[0]:
339 typ, exc, tb = error[0]
--> 340 raise exc.with_traceback(tb)
341 else:
342 return result[0]
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/utils.py in f()
322 if callback_timeout is not None:
323 future = asyncio.wait_for(future, callback_timeout)
--> 324 result[0] = yield future
325 except Exception as exc:
326 error[0] = sys.exc_info()
~/.local/lib/python3.8/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1849 exc = CancelledError(key)
1850 else:
-> 1851 raise exception.with_traceback(traceback)
1852 raise exc
1853 if errors == "skip":
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/optimization.py in __call__()
959 if not len(args) == len(self.inkeys):
960 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 961 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
962
963 def __reduce__(self):
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/core.py in get()
149 for key in toposort(dsk):
150 task = dsk[key]
--> 151 result = _execute_task(task, cache)
152 cache[key] = result
153 result = _execute_task(out, cache)
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/core.py in _execute_task()
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/utils.py in apply()
27 def apply(func, args, kwargs=None):
28 if kwargs:
---> 29 return func(*args, **kwargs)
30 else:
31 return func(*args)
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/dataframe/core.py in apply_and_enforce()
5296 func = kwargs.pop("_func")
5297 meta = kwargs.pop("_meta")
-> 5298 df = func(*args, **kwargs)
5299 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
5300 if not len(df):
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/utils.py in __call__()
891
892 def __call__(self, obj, *args, **kwargs):
--> 893 return getattr(obj, self.method)(*args, **kwargs)
894
895 def __reduce__(self):
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/geopandas/geodataframe.py in to_crs()
814 else:
815 df = self.copy()
--> 816 geom = df.geometry.to_crs(crs=crs, epsg=epsg)
817 df.geometry = geom
818 df.crs = geom.crs
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/geopandas/geoseries.py in to_crs()
541 transformer = Transformer.from_crs(self.crs, crs, always_xy=True)
542
--> 543 new_data = vectorized.transform(self.values.data, transformer.transform)
544 return GeoSeries(
545 GeometryArray(new_data), crs=crs, index=self.index, name=self.name
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/geopandas/_vectorized.py in transform()
877 if compat.USE_PYGEOS:
878 coords = pygeos.get_coordinates(data)
--> 879 new_coords = func(coords[:, 0], coords[:, 1])
880 result = pygeos.set_coordinates(data.copy(), np.array(new_coords).T)
881 return result
~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/pyproj/transformer.py in transform()
428 intime = None
429 # call pj_transform. inx,iny,inz buffers modified in place.
--> 430 self._transformer._transform(
431 inx,
432 iny,
pyproj/_transformer.pyx in pyproj._transformer._Transformer._transform()
ProjError: x, y, z, and time must be same size
The exception disapears when df
if a single csv file df = dd.read_csv('csv/2019_timeseries.csv')
I know that there's still a lot to do before a first proper release but I was thinking that it would be helpful to do a tagged alpha release of a current state. Something like 0.0.1alpha1
. We are already using dask-geopandas
in some of our research code and it would be helpful to pin a tagged version for reproducibility rather than install from git@master/commit.
Would it be okay for you? I am happy to prepare GitHub action to automatise release and conda recipe to have it on conda-forge as well.
From reading the documentation it seems you have to convert a geodataframe object into the dask-geopandas object. If you would like to use a csv or other file, this would require loading the entire dataset into memory, correct? Is there a way to be more memory efficient when loading in datasets? I originally wanted to use dask for this benefit.
First of all, thanks for this work π.
Then, I'm trying to convert a Dask DataFrame into a GeoPandas, setting geometry from 2 columns (x,y) and I think there is a problem in the README instructions:
import dask.dataframe as dd
import dask_geopandas
df = dd.read_csv('...')
df = df.set_geometry(
dask_geopandas.points_from_xy(df, 'latitude', 'longitude')
)
In that case, df
is a dask dataframe and doesn't have attribute/method like set_geometry
, raising a : AttributeError: 'DataFrame' object has no attribute 'set_geometry
.
Digging into the source code, I've finally did:
geodf = dask_geopandas.from_dask_dataframe(df)
geodf = geodf.set_geometry(
dask_geopandas.points_from_xy(
df=geodf,
x=geodf.x,
y=geodf.y,
)
)
But then, I'm not unable to export it as a geopackage. Am I missing something?
As discussed in stackoverflow, could you update the library or any alternative solution, please.
>>> df = geopandas.read_file(geopandas.datasets.get_path("naturalearth_cities"))
>>> ddf = dask_geopandas.from_geopandas(df, npartitions=4)
>>> ddf.total_bounds
dask.array<from_pandas-09ff8835d3deb87b6565b111dcf696cd-total_bounds-agg, shape=(nan,), dtype=float64, chunksize=(nan,), chunktype=numpy.ndarray>
This results in a dask array with unknown shape and chunksize, while we know this is always a 1D length 4 array, so I suppose we should be able to tell dask this is the case.
In backend
, we import _nonempty_index
from dask. Since it is a private function it is not very wise. We should look into another way of doing it.
A method to repartition a GeoDataFrame along new partitions, i.e. to conform the geodataframe to given spatial partitions.
There is a basic implementation for this at mrocklin/dask-geopandas
, which includes both a version for repartitioning a geopandas.GeoDataFrame (which is done eagerly, but could in principle also be extended to do the actual selection lazily) as for repartitioning a dask_geopandas.GeoDataFrame.
I updated the version for a geopandas.GeoDataFrame already a bit in my sjoin demo notebook:
def repartition_pandas(gdf, partitions):
partitions = geopandas.GeoDataFrame({'geometry': partitions.geometry},
crs=partitions.crs)
joined = geopandas.sjoin(partitions, gdf, how='inner', op='intersects').index_right
name = 'from-geopandas-' + tokenize(gdf, partitions)
dsk = {}
new_partitions = []
# TODO warn if certain rows are dropped (not intersecting with any of the partitions)
j = 0
for i, partition in enumerate(partitions.geometry):
try:
ind = joined.loc[i]
except KeyError:
continue
subset = gdf.loc[ind]
subset2 = subset[subset.geometry.representative_point().intersects(partition)]
dsk[name, j] = subset2
j += 1
if (subset2.geometry.type == 'Point').all():
new_partitions.append(partition)
else:
new_partitions.append(subset2.geometry.convex_hull.unary_union.convex_hull)
divisions= tuple([None]*(j+1))
result = dask_geopandas.GeoDataFrame(dsk, name, gdf.head(0), divisions, new_partitions)
return result
I think there are generally 2 cases to look out for:
(it will probably useful to already add an initial version (like the above) that only supports non-overlapping spatial partitions as specified by the user (the resulting partitions of the output can still overlap))
Current spatial partitioning in Dask-GeoPandas is limited to computing convex hulls for a set of n arbitrarily defined partitions.
Whilst a Dask-GeoDataFrame can be re-partitioned based on an attribute (column contained within the data) before computing the convex hull, this is dependent;
When dealing with Point data, an alternative approach is to encode each Point using the Geohash - which is a string that is both sortable and searchable based on prefixing.
In the next couple of weeks I will create a pull request with these features, as potential partitioning technique.
Hi,
My use case is to study only the density distribution of population in the neighborhood of a given point. The source data is the global shapefile for a 200x200m grid of France: https://www.insee.fr/fr/statistiques/4176290?sommaire=4176305
What I was doing with a smaller shapefile (1000x1000m grid) was:
With the more precise grid, it doesn't fit in memory and I can't finish the first step. I would like to do those first loading / reducing steps in dask-geopandas before continuing with geopandas.
Let me know if you already see a smart solution or how I can help!
Guillaume
Inside the CoordinateIndexer (for .cx[...]
), we access xmin, ymin, xmax, ymax = obj.spatial_partitions.total_bounds
, which only works if spatial_partitions
is not None. We should another code path for if it is not set.
We should have a top-level method on dask_geopandas.GeoDataFrame
and dask_geopandas.GeoSeries
allowing to shuffle data into spatially coherent partitions. (xref #8)
Let's assume we have hilbert_distance
and geohash
functions based on which we can spatially repartition gdf. I would then like to have a single method (spatial_shuffle
?) that can consume both.
# using hilbert
ddf = ddf.spatial_shuffle('hilbert')
# using geohash
ddf = ddf.spatial_shuffle('geohash')
This API can be extended by other methods without the necessity to have two methods for each algorithm (esp. since the repartition_
methods will be largely the same).
Originally posted by @martinfleis in #71 (comment)
note: moving to a separate specific issue to keep track of GSoC.
We should find a way of exposing sindex
attribute each partition has (and e.g. sjoin
uses) to allow custom queries without the necessity to write a lot of code (rewrite the indexing logic we have in sjoin
).
Ideally, we should find a way how to expose both query
and query_bulk
in a way that can be mapped to geometries within partitions.
One use case can be found in momepy
. This code aims to take building footprints and measure how long is the portion of its perimeter that is shared with another footprint. The code can be optimised a bit but for the illustration is good enough.
inp, res = gdf.sindex.query_bulk(gdf.geometry, predicate="intersects")
left = gdf.geometry.take(inp)
right = gdf.geometry.take(res)
intersections = left.intersection(right, align=False).length
results = intersections.groupby(inp).sum().reset_index(drop=True) - gdf.geometry.length.reset_index(drop=True)
query sindex. Since both sindex and input are the same geometry array, this returns at least one hit (itself) for each geometry, potentially more. Assuming polygons ABC adjacent A-B-C: For polygon A, it finds that it intersects [A, B], for polygons B [A, B, C] and for polygon C [B, C]. Both A-B and B-A are present (one in inp
, the other in res
and vice versa).
create long arrays based on integer indices. Here we have a lot of duplicates which allows vectorised intersection later.
intersection
between all pairs of geometries
length
of an intersection
groupby
over lengths based on original input geometry int index to get sum of all shared perimeter sections per geometry.
remove the length of itself (A-A is still there, but we don't want it in the result).
The A-A relationship could be eliminated to avoid the overhead it brings but I guess I was lazy when I wrote the code and it was good enough for the purpose :).
The same should be possible to do on top of dask_geopandas.GeoDataFrame. However, we need to figure out how to represent the output of sindex
in a distributed manner.
edit: naturalearth_lowres
will work for test purposes as footprints well enough.
From #78: some IO methods in dask (eg read_parquet) support "column projection pushdown", meaning that if you select a column (a "getitem" operation) after reading in a file, then it will actually only read that single column instead of the full file.
So for example, in the following cases
gdf = dask_geopandas.read_parquet(...)
# only the "attribute" column is read
gdf["attribute"].mean()
# only the geometry column is read (.geometry is equivalent of `gdf["geometry"]`
gdf.geometry.x
But, in GeoPandas, most of the spatial methods/attributes are support both on GeoSeries and GeoDataFrame, while often only needing the geometries also in the GeoDataFrame case. So for example gdf.geometry.unary_union
and gdf.unary_union
gives the same result. But with dask, the first would only read the geometry column while the second would read (unnecessarily) all columns from the file.
We should try to update those methods to automatically include a "getitem" operation in the dask task graph for the GeoDataFrame case
Currently, for the GeoDataFrame, the notebook repr is the same as the one of dask. It would be nice that it says "GeoDataFrame" instead of "DataFrame".
The dask Series repr also shows the divisions, name of the series, etc, while the GeoSeries repr does not.
I'm trying to evaluate this package by writing a little example, but I'm running into some errors below. I try to write a small small example, to read a simple CSV file with lat/lon columns, add a geometry column and turn this into a Dask-GeoDateFrame, but I'm running into errors like these:
AttributeError: 'Series' object has no attribute 'map_partitions'
AttributeError: 'DataFrame' object has no attribute 'geometry'
import dask_geopandas
import dask.dataframe as dd
import pandas as pd
import geopandas as gd
df = pd.read_csv('airport_volume_airport_locations.csv')
# ok
gdf = gd.GeoDataFrame(
df, geometry=gd.points_from_xy(df.Airport1Latitude, df.Airport1Longitude)
)
ddf = dask_geopandas.from_geopandas(df, npartitions=4)
# raises AttributeError: 'DataFrame' object has no attribute 'set_geometry'
ddf.set_geometry(
dask_geopandas.points_from_xy(ddf, "Airport1Latitude", "Airport1Longitude")
)
When doing a getitem operation after read_parquet
, the column selection is pushed down. So for example, in the following cases
gdf = dask_geopandas.read_parquet(...)
# only the "attribute" column is read
gdf["attribute"].mean()
# only the geometry column is read (.geometry is equivalent of `gdf["geometry"]`
gdf.geometry.x
But, it seems that specifically for total_bounds
, this doesn't work for some reason, and even gdf.geometry.total_bounds.compute()
loads all columns of the Parquet file instead of only the geometry column (which makes total_bounds
considerably slower as it could be).
(the reason I was looking into this was the realization that gdf.total_bounds
(so where the user doesn't explicitly call .geometry
first) might load all columns unnecessarily (which is relevant for all GeoDataFrame methods/attributes that only require the geometry column, and something we could fix I suppose, need to open a separate issue for that), but then when comparing with gdf.geometry.total_bounds
it didn't improve)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
π Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. πππ
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google β€οΈ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.