Git Product home page Git Product logo

Comments (15)

fjetter avatar fjetter commented on July 3, 2024 1

It looks like the graph size is also a bit unnatural. All of the data is actually stemming from the large indices that we're now bypassing during unpacking but this should be somewhat be compensated by pickle dedup + compression.

Indeed, when I manually dumps the graph w/ compression I am getting only a fifth of the payload in size. I wonder if this is a bad side effect of dask/distributed#7768 or if it is related to how we're pickling the graph. I will create a follow up ticket

from dask.

hendrikmakait avatar hendrikmakait commented on July 3, 2024

I'll have to look into what's actually happening on the cluster but let me explain what should happen in P2P rechunking:

  1. This P2P rechunk should be split into ~11 partial rechunks (we chop this up along the first axis). Hence, each P2P rechunk should be worth around 37 GiB of data.
  2. If we schedule these partial more or less sequentially, this should cause
    a. ...previous partials to get evicted from disk once all of the unpack tasks have transitioned to memory
    b. ...P2P rechunking to take a look at all the available workers at the time at which we start executing first transfer task of each partial. Assuming that the adaptive cluster ramps up the worker count pretty quickly, we should benefit from this in the later partials.

A) We should likely emit a warning (to the client) if a rechunk/shuffle is running on an adaptive cluster. Most users will not understand this and the fact that new workers cannot be utilized for this operation is something that was not present during the tasks based shuffle/rechunk

Sure, works for me.

B) I wonder if there is a way to break even such a non-trivial rechunk up into a couple of individual P2P operations. Maybe a hybrid tasks + p2p approach would do

As mentioned above, this is already being split up into partials. Do you want to split this up further?

from dask.

hendrikmakait avatar hendrikmakait commented on July 3, 2024

A) We should likely emit a warning (to the client) if a rechunk/shuffle is running on an adaptive cluster. Most users will not understand this and the fact that new workers cannot be utilized for this operation is something that was not present during the tasks based shuffle/rechunk

Is there a good way for P2P to pre-emptively trigger upscaling? I wonder if by looking at the target at the moment the P2P run initialized, we can see that we're way underprovisioned and just wait for adaptive to give us more workers to work with.

from dask.

hendrikmakait avatar hendrikmakait commented on July 3, 2024
  1. This P2P rechunk should be split into ~11 partial rechunks (we chop this up along the first axis). Hence, each P2P rechunk should be worth around 37 GiB of data.

From what I see, we have only a single worker available at the moment the first P2P partial starts. Granted, with ~28 GiB of free disk space, that worker will have a pretty bad time trying to pull of rechunking of 37 GiB of data.

from dask.

fjetter avatar fjetter commented on July 3, 2024

FWIW I ran this on a cluster with 50 workers booted up right away and it runs flawlessly. 50 is even overprovisioned it seems so apart from the super slow transmission, this is doing what it is supposed to.

image

Note: The first long CPU time range on the scheduler was a run I cancelled. The second thing is the actual run

https://cloud.coiled.io/clusters/488173/account/dask-engineering/information?scopes=%7B%22type%22%3A%22account%22%2C%22id%22%3A5147%2C%22name%22%3A%22dask-engineering%22%2C%22organizationId%22%3A7269%2C%22slug%22%3A%22dask-engineering%22%7D&tab=Code

I think I'll open a dedicated issue to the adaptive P2P topic since there are many different things we could do now or later.

from dask.

fjetter avatar fjetter commented on July 3, 2024

regarding the slowness of this I have good and bad news.

Good news: it's not dask.order 🎉

Bad news: It's unpack_remotedata recursing over the graph (for 8min)

image

I'll have a brief look if there is something we can do to patch this. Long term fix would be #9969

from dask.

fjetter avatar fjetter commented on July 3, 2024

As mentioned above, this is already being split up into partials. Do you want to split this up further?

No, if it is already split up, that's great.

from dask.

fjetter avatar fjetter commented on July 3, 2024

RE: further splitting up

Even though I wrote it down, I wasn't really registering that we are dealing with one year slices here. I was thinking about the case where the time slice is over the entire dataset. I wonder if these situations could be improved as well

from dask.

hendrikmakait avatar hendrikmakait commented on July 3, 2024

I was thinking about the case where the time slice is over the entire dataset. I wonder if these situations could be improved as well

The only case where I could see this help us is in the worst-case scenario of miniature shards. We'd still have to store all of the data somewhere, so I think we can only minimize metadata overhead.

from dask.

hendrikmakait avatar hendrikmakait commented on July 3, 2024

Regarding pre-emptive upscaling: I saw the cluster suggest 4-7 workers during the first couple of minutes of the P2P rechunking. That should've been enough in this case. (Though it wouldn't have worked if we had only a single partial.)

from dask.

fjetter avatar fjetter commented on July 3, 2024

Re: performance of unpacking the tasks

I can rewrite this code and can get it a little faster by about 20-30%. That's not nothing (in this case almost 2min faster) but it is clearly not sufficient. The inherent problem is that we cannot distinguish tuples from "tuples that are task definitions which could reference other tasks", i.e. #9969

Why this is so bad for P2P rechunking is we are including very large tuples as arguments for the tasks.

Consider this task https://github.com/dask/distributed/blob/7aea988722f285e9fc5a67a817f664bd14ef47a5/distributed/shuffle/_rechunk.py#L444-L452

which is defining the transfer task. It includes partial_index, partial_new, partial_old which are all tuples that describe the old/new chunking to some extend (and are again nested tuples). In this case, however, partial_old contains one tuple that has 8760 elements. Therefore, unpacking this task alone takes about 50ms. Multiplying this by all transfer tasks... is a lot.

There are two quick fixes we can implement

  1. wrap the partial_* tuples with a tuple subclass to tell the unpack algorithm that these are not the droids it's looking for
  2. Not passing those to the task but instead putting it into the rechunk extension since this is redundant information and subject to the currently active rechunk (this would also safe some redundancy in the graph)

from dask.

fjetter avatar fjetter commented on July 3, 2024

I wrote an issue to suggest a P2P restart if we detect a significant upscaling, see dask/distributed#8673

from dask.

fjetter avatar fjetter commented on July 3, 2024

Oh, another thing that is mildly concerning me is that the graph for this small subset already has 250MB. I'll briefly check if this is expected... Might also be those tuples

from dask.

fjetter avatar fjetter commented on July 3, 2024

I think I have to step back from this statement a little. The graph is indeed unreasonably large and compression is not kicking in but compression also doesn't help that much. After verifying this again, compression only shaves off about 10% which is why we're just ending up sending the raw data

from dask.

fjetter avatar fjetter commented on July 3, 2024

See dask/distributed#8676

from dask.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.