Comments (4)
As fair as I'm aware we aren't really doing this anywhere in the Dask deployment ecosystem. Most cluster managers have settings for node sizes and a way to configure the worker and leave it as an exercise to the user to fit one into the other.
I expect the majority of cases folks configure the node size and leave the worker on auto. Effectively not tuning the cluster to their problem.
The exception may be dask-kubernetes where the user configures the pod size and Kubernetes handles packing those pods into nodes. Often there is just a 1:1 mapping between the pod and worker settings in that case.
This topic is definitely interesting, but I feel like it would add a bunch of complexity that may be hard to wrangle.
from dask-jobqueue.
Hi @keewis, I confirm what @jacobtomlinson is saying, not aware of anything like that, and if he says so, this must be true :).
Just to make this more concrete, with your example, currently you do the maths and come up with something like (here for PBS):
cluster = PBSCluster(cores=14, processes=7, memory="105GiB", resources_spec="ncpus=28:memory=115gb")
cluster.scale(14)
I understand that this is somewhat cumbersome or even hard for some users to do right... And if you have various worker specs depending on various workflows, this gets even heavier to handle...
And what you would like is something like:
cluster = PBSCluster(job_cores=28, jobs_memory="115GiB", worker_threads=2, worker_memory="15GiB")
cluster.scale(14).
Are we okay on this?
If yes, I'd say we don't want to change all the existing classes and APIs. But maybe this could be done at a higher level with a new class on top of others? The hard point being that not all JobQueueCluster implementations use the same signature to declare job resources... I've not thought about it more than a few minutes, so maybe we could do something clever, but I don't have a nice idea that comes to my mind.
@jacobtomlinson would dask-ctl or any other tool help for this?
from dask-jobqueue.
And what you would like is something like
essentially, though to be even more specific, what I meant was
cluster = PBSCluster(job_max_cores=28, job_max_memory="115GiB", worker_threads=2, worker_memory="15GiB")
cluster.scale(14)
But maybe this could be done at a higher level with a new class on top of others?
So something that translates the high-level spec to the API of the lower-level classes?
Without unifying the JobQueueCluster
class signatures I guess that would involve maintaining a mapping of translation functions for each cluster implementation. Basically, we'd have a main translation function that would take the per-worker spec and convert it to a general job spec, and then the specific functions would adjust that to the individual implementations.
draft implementation of the proposed API
import dask_jobqueue
from dask.utils import parse_bytes, format_bytes
def convert_to_low_level(*, max_cores, max_memory, worker_threads, worker_memory):
workers = max_memory // worker_memory
if workers == 0:
raise ValueError(
f"can't use more than {format_bytes(max_memory)} per worker (got {format_bytes(worker_memory)})"
)
max_threads_per_worker = max_cores // workers
if worker_threads is not None:
if worker_threads > max_threads_per_worker:
raise ValueError(
f"can't use more than {max_threads_per_worker} threads per worker (total available: {max_cores}, with {workers} workers)"
)
else:
worker_threads = max_threads_per_worker
memory = worker_memory * workers
cores = worker_threads * workers
return {"memory": format_bytes(memory), "cores": cores, "processes": workers}
cluster_implementations = {
"pbs": dask_jobqueue.PBSCluster,
"slurm": dask_jobqueue.SLURMCluster,
}
parameter_translations = {
"pbs": lambda x: x,
"slurm": lambda x: x,
}
class WorkerSpecCluster:
def __init__(
self,
*,
kind,
max_cores,
max_memory,
worker_memory,
worker_threads=None,
**additional_kwargs,
):
main_kwargs = convert_to_low_level(
max_cores=max_cores,
max_memory=parse_bytes(max_memory),
worker_memory=parse_bytes(worker_memory),
worker_threads=worker_threads,
)
translator = parameter_translations.get(kind, lambda x: x)
kwargs = additional_kwargs | translator(main_kwargs)
cluster_implementation = cluster_implementations.get(kind)
if cluster_implementation is None:
raise ValueError(
f"unknown implementation {kind!r}, choose one of {{{', '.join(sorted(cluster_implementations))}}}"
)
self._cluster = cluster_implementation(**kwargs)
def _html_repr_(self):
return self._cluster._html_repr_()
def __getattr__(self, name):
return getattr(self._cluster, name)
but when trying it on my local jobqueue, I immediately ran into the issue that min_cores == max_cores
, so that is probably also going to have to be addressed somehow.
from dask-jobqueue.
So something that translates the high-level spec to the API of the lower-level classes?
Yep, this is what I had in mind.
Without unifying the JobQueueCluster class signatures
This is also something that could/should be considered, I think we might achieve some level of unification if we'd want to (at least how to specify job_cores, job_memory, worker_processes/memory/threads), although there have been several discussion on weather to use total cores vs threads by worker at the beginning of Dask jobqueue.
Basically, we'd have a main translation function that would take the per-worker spec and convert it to a general job spec, and then the specific functions would adjust that to the individual implementations
Yep, I agree with this part.
draft implementation of the proposed API
That looks like a good starting point!
but when trying it on my local jobqueue, I immediately ran into the issue that min_cores == max_cores, so that is probably also going to have to be addressed somehow.
You mean your job scheduler wants you to always book a complete compute node? So dask-jobqueue must ask for the max core?
I think that is what I had in mind with
cluster = PBSCluster(job_cores=28, jobs_memory="115GiB", worker_threads=2, worker_memory="15GiB")
cluster.scale(14).
Just book this number of cores and this memory with the job scheduling system, and fit as much workers with my spec in it.
from dask-jobqueue.
Related Issues (20)
- Documentation bug: interface HOT 1
- documentation: document `worker_command` kwarg
- Strange Worker KeyError when using LSFCluster. HOT 6
- Update NERSC Cori to NERSC Perlmutter in docs HOT 3
- SLURMCluster doesn't spawn new workers when old ones timeout HOT 12
- conftest.py not included in PyPI source tarball HOT 1
- CI is currently failing HOT 5
- ConnectionRefusedError HOT 2
- ImportError on ignoring attribute from dask.utils when importing dask_jobqueue HOT 2
- Resource allocation on SLURM cluster HOT 9
- Add a `py.typed` marker HOT 1
- Unable to submit jobs to PBS queue HOT 2
- Worker startup timeout leads to inconsistent cluster state HOT 3
- Remove deprecated project kwarg in Cluster implementation, or use it as it should be
- TypeError: unhashable type: 'list' when importing dask-jobqueue HOT 3
- Release soon HOT 26
- mem error HOT 1
- Broken link in docs HOT 2
- Documentation about `memory` vs 'job_mem` could be improved HOT 1
- Potentially confusing information about `processes` in the docs HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from dask-jobqueue.