Comments (15)
It looks like the graph size is also a bit unnatural. All of the data is actually stemming from the large indices that we're now bypassing during unpacking but this should be somewhat be compensated by pickle dedup + compression.
Indeed, when I manually dumps
the graph w/ compression I am getting only a fifth of the payload in size. I wonder if this is a bad side effect of dask/distributed#7768 or if it is related to how we're pickling the graph. I will create a follow up ticket
from dask.
I'll have to look into what's actually happening on the cluster but let me explain what should happen in P2P rechunking:
- This P2P rechunk should be split into ~11 partial rechunks (we chop this up along the first axis). Hence, each P2P rechunk should be worth around 37 GiB of data.
- If we schedule these partial more or less sequentially, this should cause
a. ...previous partials to get evicted from disk once all of the unpack tasks have transitioned to memory
b. ...P2P rechunking to take a look at all the available workers at the time at which we start executing first transfer task of each partial. Assuming that the adaptive cluster ramps up the worker count pretty quickly, we should benefit from this in the later partials.
A) We should likely emit a warning (to the client) if a rechunk/shuffle is running on an adaptive cluster. Most users will not understand this and the fact that new workers cannot be utilized for this operation is something that was not present during the tasks based shuffle/rechunk
Sure, works for me.
B) I wonder if there is a way to break even such a non-trivial rechunk up into a couple of individual P2P operations. Maybe a hybrid tasks + p2p approach would do
As mentioned above, this is already being split up into partials. Do you want to split this up further?
from dask.
A) We should likely emit a warning (to the client) if a rechunk/shuffle is running on an adaptive cluster. Most users will not understand this and the fact that new workers cannot be utilized for this operation is something that was not present during the tasks based shuffle/rechunk
Is there a good way for P2P to pre-emptively trigger upscaling? I wonder if by looking at the target at the moment the P2P run initialized, we can see that we're way underprovisioned and just wait for adaptive to give us more workers to work with.
from dask.
- This P2P rechunk should be split into ~11 partial rechunks (we chop this up along the first axis). Hence, each P2P rechunk should be worth around 37 GiB of data.
From what I see, we have only a single worker available at the moment the first P2P partial starts. Granted, with ~28 GiB of free disk space, that worker will have a pretty bad time trying to pull of rechunking of 37 GiB of data.
from dask.
FWIW I ran this on a cluster with 50 workers booted up right away and it runs flawlessly. 50 is even overprovisioned it seems so apart from the super slow transmission, this is doing what it is supposed to.
Note: The first long CPU time range on the scheduler was a run I cancelled. The second thing is the actual run
I think I'll open a dedicated issue to the adaptive P2P topic since there are many different things we could do now or later.
from dask.
regarding the slowness of this I have good and bad news.
Good news: it's not dask.order
🎉
Bad news: It's unpack_remotedata recursing over the graph (for 8min)
I'll have a brief look if there is something we can do to patch this. Long term fix would be #9969
from dask.
As mentioned above, this is already being split up into partials. Do you want to split this up further?
No, if it is already split up, that's great.
from dask.
RE: further splitting up
Even though I wrote it down, I wasn't really registering that we are dealing with one year slices here. I was thinking about the case where the time slice is over the entire dataset. I wonder if these situations could be improved as well
from dask.
I was thinking about the case where the time slice is over the entire dataset. I wonder if these situations could be improved as well
The only case where I could see this help us is in the worst-case scenario of miniature shards. We'd still have to store all of the data somewhere, so I think we can only minimize metadata overhead.
from dask.
Regarding pre-emptive upscaling: I saw the cluster suggest 4-7 workers during the first couple of minutes of the P2P rechunking. That should've been enough in this case. (Though it wouldn't have worked if we had only a single partial.)
from dask.
Re: performance of unpacking the tasks
I can rewrite this code and can get it a little faster by about 20-30%. That's not nothing (in this case almost 2min faster) but it is clearly not sufficient. The inherent problem is that we cannot distinguish tuples from "tuples that are task definitions which could reference other tasks", i.e. #9969
Why this is so bad for P2P rechunking is we are including very large tuples as arguments for the tasks.
Consider this task https://github.com/dask/distributed/blob/7aea988722f285e9fc5a67a817f664bd14ef47a5/distributed/shuffle/_rechunk.py#L444-L452
which is defining the transfer task. It includes partial_index
, partial_new
, partial_old
which are all tuples that describe the old/new chunking to some extend (and are again nested tuples). In this case, however, partial_old
contains one tuple that has 8760 elements. Therefore, unpacking this task alone takes about 50ms. Multiplying this by all transfer tasks... is a lot.
There are two quick fixes we can implement
- wrap the
partial_*
tuples with a tuple subclass to tell the unpack algorithm that these are not the droids it's looking for - Not passing those to the task but instead putting it into the rechunk extension since this is redundant information and subject to the currently active rechunk (this would also safe some redundancy in the graph)
from dask.
I wrote an issue to suggest a P2P restart if we detect a significant upscaling, see dask/distributed#8673
from dask.
Oh, another thing that is mildly concerning me is that the graph for this small subset already has 250MB. I'll briefly check if this is expected... Might also be those tuples
from dask.
I think I have to step back from this statement a little. The graph is indeed unreasonably large and compression is not kicking in but compression also doesn't help that much. After verifying this again, compression only shaves off about 10% which is why we're just ending up sending the raw data
from dask.
from dask.
Related Issues (20)
- Concat with unknown divisions raises TypeError HOT 1
- Dask 2024.5.1 removed `.attrs` HOT 11
- Dask 2024.5.1 raises exception when `.compute()` is called on a categorical column HOT 3
- a tutorial for distributed text deduplication HOT 5
- Large memory use when loading file with np.memmap in recent dask versions HOT 8
- `dask/dataframe/tests/test_indexing.py::test_getitem_integer_slice` failing with nightly `pandas`
- dask.dataframe import error for Python 3.12.3 HOT 3
- 'SeriesGroupBy' object has no attribute 'nunique_approx' HOT 6
- Categorical column information incorrectly copied over when using series to create new dataframe resulting in a broken dataframe
- calling repartition on ddf with timeseries index after resample causes ValueError: left side of old and new divisions are different
- Can not process datasets created by the older version of Dask HOT 9
- Improve documentation for `dd.from_map(...)` HOT 1
- AssertionError: DataFrame are different with dask 2024.5.1 and python 3.12 HOT 3
- `test_quantile` flaky
- Shuffle not raising exception when `on` does not exist HOT 1
- [FEA] Add official mechanism to check if query-planning is enabled in ``dask.dataframe`` HOT 3
- UnboundLocalError in test_dt_accessor when dd._dask_expr_enabled is False HOT 2
- Error with the default tokenizer. HOT 4
- Cannot bind async delayed
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from dask.