Git Product home page Git Product logo

Comments (8)

wilkeber avatar wilkeber commented on July 23, 2024

I noticed you are working on this experimental feature in branch feature-dask, see #9.

These lines look a lot like what I am hoping for without the need for pickling anything. Essentially worker_wrapper_partial_pool (and thus eventually our own provided worker function) gets the dictionary directly, which fully avoids the need for the template run.py. Is this correct?

from psweep.

elcorto avatar elcorto commented on July 23, 2024

Thanks for your interest in the project.

Your question is spot-on and describes accurately the template workflow's limitations.

Yes, using dask (#9) will avoid all that and you just need to replace

ps.run(func, params)

by

from dask.distributed import Client

# Or SGECluster or ...
from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(
    ... HPC settings here ...
)
client = Client(cluster)

ps.run(func, params, dask_client=client)

I have now pushed a branch develop, which ATM is the same as feature-dask. This won't see any force-push :) and you can experiment with the new dask support. Note that in main and newer branches, so develop and feature-dask, run_local() was renamed to run(), but we have backwards compatibility and you'll see a deprecation warning.

You need to install psweep manually as described here, e.g.

python -m venv psweep
. ./psweep/bin/activate
git clone ...
cd /path/to/psweep
git checkout develop
pip install -e ".[dask]"

I have extended the docs in that regard, adding a detailed comparison of dask vs templates and when to use which method. GitHub renders the markdown quite well but is missing things like included example files. There are new examples here. The improved docs also explain a pickle-based workflow using templates as you describe (since I did that in the past but haven't documented it).

If you want to render the docs locally (which needs a lot of stuff installed), do, in the same venv as above probably:

cd /path/to/psweep
pip install -e ".[doc]"
cd doc
sh generate-doc.sh

That's what we do in the gh pages CI as well.

If you use the dask support, feel free to report back any problems that you faced. Thanks a lot.

from psweep.

wilkeber avatar wilkeber commented on July 23, 2024

I have been reading about dask and testing the dask supported version. It took me a bit to get things roughly right, so I thought I'd leave some comments on the issues I came across. Note they aren't real psweep issues, just two lessons learned that might help others wanting to use the dask support.

1st: setting up environment
Above, you gave excellent instructions, which unfortunately I partially ignored at first. I did not install psweep via pip install -e as indicated above, since I had a local clone of the repo and thought I'd just add the path myself. This was a confusing mistake, for someone unexperienced with dask and distributed computing.

Everything seem to run without error, but my computations were not executed. I believe the issue was that the Callable called worker in psweep was defined in my main script, which imported psweep. The distributed tasks attempted to import psweep as well, but since it was not properly installed in the python environment shared by all machines, they exited with an error. This error was not propagated, but checking the log files eventually showed the import error.

The solution was to simply follow your guidance and install the local psweep version from the dev branch.

2nd: infinite loop re-computing same parameter sets
I was interested in the case of having more jobs than what could be finalized by all dask workers within their defined walltime. I came across this section in the documentation and the issue discussing this situation:

https://jobqueue.dask.org/en/latest/advanced-tips-and-tricks.html#how-to-handle-job-queueing-system-walltime-killing-workers

dask/dask-jobqueue#122

Summarized in my own words: the cluster should be set to scale adaptively using cluster.adapt(minimum=1, ...) instead of cluster.scale(), which allows to bring up dask workers after they have hit the walltime. Furthermore, the cluster must be instantiated with extra arguments, which indicate the lifetime of the jobs (a time somewhat shorter than the walltime) to allow the adapt() method to transfer the state of one worker to another before it hits the walltime and is killed. I was testing this simple script, which ended up being pretty much the same as those discussed in dask/dask-jobqueue#122.

from dask.distributed import Client
from distributed.worker import logger
from dask_jobqueue import SGECluster
import time
import psweep as ps

def local_analyzer(pset):
    logger.info(f'starting {pset}') # to get a report in the dask worker logs
    time.sleep(5)
    logger.info(f'finished {pset}')
    return {'b': pset['a']}

if __name__ == "__main__":
    # crate N points to 'sweep'
    N = 60
    a = ps.plist("a", range(N))
    params = ps.pgrid([a])

    with SGECluster(queue='queue.q',
                    cores=1,
                    memory='1GB',
                    walltime='00:01:00',
                    worker_extra_args=["--lifetime", "40s", "--lifetime-stagger", "10s"]) as cluster:
        # scale the cluster to more workers
        cluster.adapt(minimum_jobs=2, maximum_jobs=2)

        with Client(cluster) as client:
            # distrubte jobs
            futures = client.map(local_analyzer, params)

            # wait for results            
            results = client.gather(futures)
            for result in results:
                print(result)
                
            # using psweep
            # df = ps.run(local_analyzer, params, dask_client=client, calc_dir=CALC_DIR, tmpsave=True)
            # ps.df_print(df, cols=["_pset_id", "_exec_host"])
        print('out of client')
        
    # need to wait a bit for the cluster to resize due to its adaptive nature or we get an (unimportant) error
    time.sleep(10)
    print('out of cluster')

Using the suggested solution, I still got the KilledWorker error discussed in dask/dask-jobqueue#122 and the docs. When reading the logs of different dask-workers, it was clear that the dask-workers were being shut down as intended, but the logger.info() calls in local_analyzer() revealed that the same psets were being calculated over and over again. It seemed that the dask scheduler was not quite collecting all results and redistributing the jobs correctly. The log below is an example output, which shows the logging of one dask worker.

Warning: no access to tty (Bad file descriptor).
Thus no job control in this shell.
stty: standard input: Inappropriate ioctl for device
2023-10-11 11:22:22,641 - distributed.nanny - INFO -         Start Nanny at: 'tcp://XX.X.X.XXX:XXXXX'
2023-10-11 11:22:23,491 - distributed.worker - INFO -       Start worker at:    tcp://XX.X.X.XXX:XXXXX
2023-10-11 11:22:23,491 - distributed.worker - INFO -          Listening to:    tcp://XX.X.X.XXX:XXXXX
2023-10-11 11:22:23,491 - distributed.worker - INFO -           Worker name:               SGECluster-0
2023-10-11 11:22:23,492 - distributed.worker - INFO -          dashboard at:          XX.X.X.XXX:XXXXX
2023-10-11 11:22:23,492 - distributed.worker - INFO - Waiting to connect to:    tcp://XX.X.X.XXX:XXXXX
2023-10-11 11:22:23,492 - distributed.worker - INFO - -------------------------------------------------
2023-10-11 11:22:23,492 - distributed.worker - INFO -               Threads:                          1
2023-10-11 11:22:23,492 - distributed.worker - INFO -                Memory:                   0.93 GiB
2023-10-11 11:22:23,492 - distributed.worker - INFO -       Local Directory: /tmp/3010181.1.queue.q/dask-scratch-space/worker-1dqww_9c
2023-10-11 11:22:23,492 - distributed.worker - INFO - -------------------------------------------------
2023-10-11 11:22:23,850 - distributed.worker - INFO - Starting Worker plugin shuffle
2023-10-11 11:22:23,850 - distributed.worker - INFO -         Registered to:    tcp://XX.X.X.XXX:XXXXX
2023-10-11 11:22:23,851 - distributed.worker - INFO - -------------------------------------------------
2023-10-11 11:22:23,851 - distributed.core - INFO - Starting established connection to tcp://XX.X.X.XXX:XXXXX
2023-10-11 11:22:23,853 - distributed.worker - INFO - starting {'a': 6}
2023-10-11 11:22:28,853 - distributed.worker - INFO - finished {'a': 6}
2023-10-11 11:22:28,857 - distributed.worker - INFO - starting {'a': 5}
2023-10-11 11:22:33,857 - distributed.worker - INFO - finished {'a': 5}
2023-10-11 11:22:33,858 - distributed.worker - INFO - starting {'a': 8}
2023-10-11 11:22:38,859 - distributed.worker - INFO - finished {'a': 8}
2023-10-11 11:22:38,861 - distributed.worker - INFO - starting {'a': 10}
2023-10-11 11:22:43,861 - distributed.worker - INFO - finished {'a': 10}
2023-10-11 11:22:43,864 - distributed.worker - INFO - starting {'a': 12}
2023-10-11 11:22:48,864 - distributed.worker - INFO - finished {'a': 12}
2023-10-11 11:22:48,866 - distributed.worker - INFO - starting {'a': 9}
2023-10-11 11:22:53,866 - distributed.worker - INFO - finished {'a': 9}
2023-10-11 11:22:53,868 - distributed.worker - INFO - starting {'a': 11}
2023-10-11 11:22:58,868 - distributed.worker - INFO - finished {'a': 11}
2023-10-11 11:22:58,870 - distributed.worker - INFO - starting {'a': 13}
2023-10-11 11:23:03,870 - distributed.worker - INFO - finished {'a': 13}
2023-10-11 11:23:03,872 - distributed.worker - INFO - starting {'a': 14}
2023-10-11 11:23:03,878 - distributed.worker - INFO - Closing worker gracefully: tcp://XX.X.X.XXX:XXXXX. Reason: worker-lifetime-reached
2023-10-11 11:23:03,881 - distributed.worker - INFO - Stopping worker at tcp://XX.X.X.XXX:XXXXX. Reason: worker-lifetime-reached
2023-10-11 11:23:03,883 - distributed.nanny - INFO - Closing Nanny gracefully at 'tcp://XX.X.X.XXX:XXXXX'. Reason: worker-lifetime-reached
2023-10-11 11:23:03,884 - distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name='execute(local_analyzer-cd7475212b8d587afc224856da19387e)' coro=<Worker.execute() done, defined at /path/to/python/python3.11/site-packages/distributed/worker_state_machine.py:3609>> ended with CancelledError
2023-10-11 11:23:03,885 - distributed.core - INFO - Connection to tcp://XX.X.X.XXX:XXXXX has been closed.
2023-10-11 11:23:08,872 - distributed.worker - INFO - finished {'a': 14}
2023-10-11 11:23:08,874 - distributed.nanny - INFO - Worker closed
2023-10-11 11:23:11,138 - distributed.nanny - INFO - Closing Nanny at 'tcp://XX.X.X.XXX:XXXXX'. Reason: nanny-close-gracefully
2023-10-11 11:23:11,139 - distributed.dask_worker - INFO - End worker

What solved the issue? I took the wild and naive guess, that perhaps my timescales are a bit off and started increasing the number of jobs, walltimes, lifetimes and lifetime-stagger. Eventually, with N=180, walltime='00:05:00' and worker_extra_args=["--lifetime", "3m", "--lifetime-stagger", "1m"] it works for my example. I noticed that a lifetime-stagger of 0 never works and my guess is that for this highly deterministic workload, all workers are always killed at the same time and none can take care of the other workers' states. Either way, it seems like dask-jobqueue is not quite robust and I don't know what happens if our queues are full and we cannot bring up alternative workers quickly enough. My guess is that all intermediate results are lost and will be recomputed (even if psweep's tmpsave is set to True), which is not ideal. To mitigate the number of unnecessary recomputes, I believe a possible workaround for psweep would be do adapt the lines

https://github.com/elcorto/psweep/blob/3bac7c3a0676fc75eabf39f1874f2220355e0351/src/psweep/psweep.py#L1024C1-L1025C50

like this:

chunk_size = some_multiple_of_cluster_size  # I don't know what makes most sense here
results = []
for chunk in (params[pos:pos+chunk_size] for pos in range(0, len(params), chunk_size)):
    futures = dask_client.map(worker_wrapper_partial_pool, chunk)
    results.extend(dask_client.gather(futures))

I understand that you would not want to chase after strange dask-jobqueue effects, so the same lines above can also be done outside psweep, i.e., chunking the parameter list given to psweep.run(). This is most likely what I will be doing to be more robust against full queues and non-overlapping dask-workers without the need to do full recomputations.

TL;DR: scale your cluster with adapt() and provide the additional parameters lifetime and lifetime-stagger. They should not be too small and in the order of minutes for this solution to (most likely) work.

from psweep.

elcorto avatar elcorto commented on July 23, 2024

Thank you for this very detailed report, that's highly appreciated!

Some comments on your findings.

Environment

We don't have any HPC-related install docs yet. Would something like this have helped?

"Jobs on HPC systems usually inherit the environment from which they were started. For instance, you can have a venv in your $HOME with psweep and dask installed. This will end up in $HOME/.virtualenvs/psweep which is venv's default. Then submit from this environment, which will propagate all environment variables to batch jobs. $HOME on most if not all HPC machines is on a shared file system mounted at each node, which makes things work."

# Assuming that you (need to) use the HPC machine's module system to load python
hpc $ module load python

hpc $ python -m venv psweep
hpc $ . ./psweep/bin/activate
(psweep) hpc $ cd /path/to/psweep
(psweep) hpc $ pip install -e ".[dask]"
(psweep) hpc $ cd /path/to/work/dir
(psweep) hpc $ sbatch dask_control.job

Worker lifetime and cluster.adapt()

I wasn't aware of that part of dask_jobqueue's docs, thanks for sharing!

Robustness of dask_jobqueue and psweep's relation to workflow engines

HPC machines are a complex piece of tech, so I think that dask_jobqueue is a great help. It covers probably about 80% of the common situations, but it can't handle each setup for sure (+- a few bugs maybe).

Outside of the dask ecosystem and psweep's scope, there are many projects which deal with failed jobs on HPC and other infrastructure. There is a non-comprehensive list in the psweep docs which lists tools for parameter sweeps, experiment tracking and workflow orchestration, all of which relate in some way. Some tools fall into multiple categories, so the boundaries a blurry here. Workflow engines usually offer resilience measures against failing workers. I can't give recommendations since I haven't used them yet, but nevertheless here are docs related to failed jobs for aiida and fireworks. Both tools are built by the materials modeling community, which makes heavy use of HPC resources.

Those tools are super powerful, but also more complex to set up and use (so no free lunch here). Many require you to set up a database server (PostgreSQL, MongoDB) where they can store results and metadata. They can also manage complex workflows such as jobs that depend on each other as a DAG and much more.

In psweep we assume the simplest case, workloads are independent (embarrassingly parallel). Thus we just expose the dask_client arg to leverage dask's concurrent.futures API. psweep provides simple helpers to set up the params (plist, pgrid, stargrid) and gives you a DataFrame with useful metadata such as pset hashes and UUIDs for psets and runs.

One way to solve task dependencies with psweep is by running things in order manually, say 10prepare.py, 20production.py, 30eval.py, where the first two can use psweep to compute stuff, update the database and store intermediate data on disk, which the next script would pick up. The "workflow" is to run all scripts in order. This is super low tech, simple, but also a bit brittle. For more challenging dependencies and more reproducibility, I'd look into using one of the tools above.

In the context of psweep these tools may become interesting if one can use their job scheduling features (so what dask.distributed + dask_jobqueue do) without the need to buy into their DSLs or database setups.

Code changes to support specific use cases (chunking)

You are right, I probably wouldn't want to add features that solve a narrow use case which I haven't been facing and testing in production myself. However, if you have a well tested feature that doesn't break current functionality and comes with tests, please feel free to send a PR.

psweep notes

  • ps.run(..., tmpsave=True) will save results from each pset on disk to <calc_dir>/tmpsave/<run_id>/<pset_id>.pk. This won't be used by dask workers. This feature assumes that you collect those results manually into a DataFrame later by a post-processing script. The idea for this was that if some workers fail, we have at least some results saved. Also for long running workloads, one can start to analyze already finished results. But as mentioned, that requires a bit of scripting. Also I haven't used it all that much in production. See also this comment.
  • If you have only one plist, you can skip the pgrid call.

from psweep.

elcorto avatar elcorto commented on July 23, 2024

Dear @wilkeber, if you have more findings to add here, please don't hesitate to do so. The basic dask support is now in release 0.11.1 in the form discussed above, ps.run(..., dask_client=client). If you run into any issues with this, please open a new issue. I'm taking the liberty to close this one since I think the original question is answered. Again, thank you for your question and detailed report!

from psweep.

wilkeber avatar wilkeber commented on July 23, 2024

Sorry for the missing reply on your previous comment. It contained interesting information, but for now I have continued working with the setup as is with an occasional restart. Before such a restart I gathered the tmpsave results, created/expanded the database.pk file and re-ran the simulation. It is far from elegant but does the work for now.

I actually like the clarity and simplicity of psweep, so leaving it as is, is great! The problems I am facing are rather dask problems, but since psweep provides enough transparency and does not obfuscate those problems, I can work with them.

from psweep.

wilkeber avatar wilkeber commented on July 23, 2024

Even if the issue is closed, I'll add one more comment, because I believe it might be useful for people using psweep with dask, when they are actually developing their own applications. As you pointed out, jobs in HPC cluster usually inherit the environment. However, when starting a sweep from some local repository, in which you are currently developing and quickly changing things, I found it useful to know of the UploadDirectory plugin. It allows you to copy your files onto the workers environment, see https://distributed.dask.org/en/stable/plugins.html#built-in-nanny-plugins

import psweep as ps
from dask.distributed import Client
from dask_jobqueue import SOMECluster
from distributed.diagnostics.plugin import UploadDirectory
...
with Client(cluster) as client:
    # upload local repo to jobs
    client.register_plugin(UploadDirectory(path/to/my/project, update_path=True))
    ps.run(...)
    ...

from psweep.

elcorto avatar elcorto commented on July 23, 2024

Even if the issue is closed, I'll add one more comment, because I believe it might be useful for people using psweep with dask, when they are actually developing their own applications.

No worries, these notes are very much welcome. I may integrate some of your tips in the docs at some point. For now, I have linked from the docs to this issue at least.

As you pointed out, jobs in HPC cluster usually inherit the environment. However, when starting a sweep from some local repository, in which you are currently developing and quickly changing things, I found it useful to know of the UploadDirectory plugin. It allows you to copy your files onto the workers environment, see https://distributed.dask.org/en/stable/plugins.html#built-in-nanny-plugins

Interesting, thanks for sharing. I was not aware of this. If by repository you mean a git repo, then my way of dealing with this, if the code in the repo is a Python project, is

  • set up a venv, maybe with access to system libs, so python -m venv --system-site-packages ... (actually I use virtualenvwrapper)
  • cd /path/to/dev-repo; pip install -e . the dev-repo into that venv, which just says to import code from /path/to/dev-repo and not some install location
  • submit dask_control.slurm from within that venv

This makes sure (at least on our machine, using SLURM) that each worker has access to the current state in dev-repo. For non-python stuff, a shared file system should grant access to modified files, so I wonder which use case you cover by uploading, where I guess files end up in each worker's tmp dir (e.g. something like dask_tmp/dask-scratch-space/worker-f5qjbu83/ if SLURMCluster(..., local_directory="dask_tmp"))?

from psweep.

Related Issues (17)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.