Git Product home page Git Product logo

Comments (4)

jacobtomlinson avatar jacobtomlinson commented on September 22, 2024 1

As fair as I'm aware we aren't really doing this anywhere in the Dask deployment ecosystem. Most cluster managers have settings for node sizes and a way to configure the worker and leave it as an exercise to the user to fit one into the other.

I expect the majority of cases folks configure the node size and leave the worker on auto. Effectively not tuning the cluster to their problem.

The exception may be dask-kubernetes where the user configures the pod size and Kubernetes handles packing those pods into nodes. Often there is just a 1:1 mapping between the pod and worker settings in that case.

This topic is definitely interesting, but I feel like it would add a bunch of complexity that may be hard to wrangle.

from dask-jobqueue.

guillaumeeb avatar guillaumeeb commented on September 22, 2024

Hi @keewis, I confirm what @jacobtomlinson is saying, not aware of anything like that, and if he says so, this must be true :).

Just to make this more concrete, with your example, currently you do the maths and come up with something like (here for PBS):

cluster = PBSCluster(cores=14, processes=7, memory="105GiB", resources_spec="ncpus=28:memory=115gb")
cluster.scale(14)

I understand that this is somewhat cumbersome or even hard for some users to do right... And if you have various worker specs depending on various workflows, this gets even heavier to handle...

And what you would like is something like:

cluster = PBSCluster(job_cores=28, jobs_memory="115GiB", worker_threads=2, worker_memory="15GiB")
cluster.scale(14).

Are we okay on this?

If yes, I'd say we don't want to change all the existing classes and APIs. But maybe this could be done at a higher level with a new class on top of others? The hard point being that not all JobQueueCluster implementations use the same signature to declare job resources... I've not thought about it more than a few minutes, so maybe we could do something clever, but I don't have a nice idea that comes to my mind.

@jacobtomlinson would dask-ctl or any other tool help for this?

from dask-jobqueue.

keewis avatar keewis commented on September 22, 2024

And what you would like is something like

essentially, though to be even more specific, what I meant was

cluster = PBSCluster(job_max_cores=28, job_max_memory="115GiB", worker_threads=2, worker_memory="15GiB")
cluster.scale(14)

But maybe this could be done at a higher level with a new class on top of others?

So something that translates the high-level spec to the API of the lower-level classes?

Without unifying the JobQueueCluster class signatures I guess that would involve maintaining a mapping of translation functions for each cluster implementation. Basically, we'd have a main translation function that would take the per-worker spec and convert it to a general job spec, and then the specific functions would adjust that to the individual implementations.

draft implementation of the proposed API
import dask_jobqueue
from dask.utils import parse_bytes, format_bytes


def convert_to_low_level(*, max_cores, max_memory, worker_threads, worker_memory):
    workers = max_memory // worker_memory

    if workers == 0:
        raise ValueError(
            f"can't use more than {format_bytes(max_memory)} per worker (got {format_bytes(worker_memory)})"
        )

    max_threads_per_worker = max_cores // workers
    if worker_threads is not None:
        if worker_threads > max_threads_per_worker:
            raise ValueError(
                f"can't use more than {max_threads_per_worker} threads per worker (total available: {max_cores}, with {workers} workers)"
            )
    else:
        worker_threads = max_threads_per_worker

    memory = worker_memory * workers
    cores = worker_threads * workers

    return {"memory": format_bytes(memory), "cores": cores, "processes": workers}


cluster_implementations = {
    "pbs": dask_jobqueue.PBSCluster,
    "slurm": dask_jobqueue.SLURMCluster,
}
parameter_translations = {
    "pbs": lambda x: x,
    "slurm": lambda x: x,
}


class WorkerSpecCluster:
    def __init__(
        self,
        *,
        kind,
        max_cores,
        max_memory,
        worker_memory,
        worker_threads=None,
        **additional_kwargs,
    ):
        main_kwargs = convert_to_low_level(
            max_cores=max_cores,
            max_memory=parse_bytes(max_memory),
            worker_memory=parse_bytes(worker_memory),
            worker_threads=worker_threads,
        )
        translator = parameter_translations.get(kind, lambda x: x)
        kwargs = additional_kwargs | translator(main_kwargs)

        cluster_implementation = cluster_implementations.get(kind)
        if cluster_implementation is None:
            raise ValueError(
                f"unknown implementation {kind!r}, choose one of {{{', '.join(sorted(cluster_implementations))}}}"
            )

        self._cluster = cluster_implementation(**kwargs)
        
    def _html_repr_(self):
        return self._cluster._html_repr_()
    
    def __getattr__(self, name):
        return getattr(self._cluster, name)

but when trying it on my local jobqueue, I immediately ran into the issue that min_cores == max_cores, so that is probably also going to have to be addressed somehow.

from dask-jobqueue.

guillaumeeb avatar guillaumeeb commented on September 22, 2024

So something that translates the high-level spec to the API of the lower-level classes?

Yep, this is what I had in mind.

Without unifying the JobQueueCluster class signatures

This is also something that could/should be considered, I think we might achieve some level of unification if we'd want to (at least how to specify job_cores, job_memory, worker_processes/memory/threads), although there have been several discussion on weather to use total cores vs threads by worker at the beginning of Dask jobqueue.

Basically, we'd have a main translation function that would take the per-worker spec and convert it to a general job spec, and then the specific functions would adjust that to the individual implementations

Yep, I agree with this part.

draft implementation of the proposed API

That looks like a good starting point!

but when trying it on my local jobqueue, I immediately ran into the issue that min_cores == max_cores, so that is probably also going to have to be addressed somehow.

You mean your job scheduler wants you to always book a complete compute node? So dask-jobqueue must ask for the max core?
I think that is what I had in mind with

cluster = PBSCluster(job_cores=28, jobs_memory="115GiB", worker_threads=2, worker_memory="15GiB")
cluster.scale(14).

Just book this number of cores and this memory with the job scheduling system, and fit as much workers with my spec in it.

from dask-jobqueue.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.