Git Product home page Git Product logo

colmena's Introduction

Colmena

CI Documentation Status PyPI version Coverage Status

Colmena simplifies building autonomous applications that steer large campaigns of simulations on supercomputers.

Such "high-throughput" campaigns were, historically, guided by humans identifying the which tasks to run next — a time-consuming process with a high latency between "new data" and "decisions."

Colmena was created to build applications which augment or replace human steering with Artificial Intelligence (AI).

Installation

Colmena is available via PyPI: pip install colmena

Consult our Installation Guide for further details.

Using Colmena

Colmena applications describe a computational campaign in two components: a "Thinker" that picks computations and a "Doer" which executes them.

Thinkers encode the logic for how to run new calculations as "agents." Complex strategies are simple to express when you decompose them into simple steps. For example, a distributed optimizer:

from random import random

from colmena.thinker import BaseThinker, result_processor, task_submitter, ResourceCounter
from colmena.queue import PipeQueues
from colmena.models import Result

# Build queues to connect Thinker and Doer
queues = PipeQueues()


class Thinker(BaseThinker):

    def __init__(self, queues, num_workers: int, num_guesses=100):
        super().__init__(queues, ResourceCounter(num_workers))
        self.best_result = None
        self.answer = -10  # A (bad) starting guess
        self.num_guesses = num_guesses

    @task_submitter()
    def submit_task(self):
        """Submit a new guess close to the current best whenever a node is free"""
        self.queues.send_inputs(self.answer - 1 + 2 * random(), method='simulate')

    @result_processor()
    def store_result(self, result: Result):
        """Update best guess whenever a simulation finishes"""
        assert result.success, result.failure_info
        # Update the best result
        if self.best_result is None or result.value > self.best_result:
            self.answer = result.args[0]
            self.best_result = result.value
        self.rec.release()  # Mark that a node is now free

        # Determine if we are done
        self.num_guesses -= 1
        if self.num_guesses <= 0:
            self.done.set()


thinker = Thinker(queues, 8)

Doers describe the types of computations and available compute resources. Colmena provides Task Servers backed by several workflow engines, such as those from the ExaWorks project. Building one using Parsl requires only that your computations are expressed as Python functions:

from parsl.configs.htex_local import config  # Configuration to run locally
from colmena.task_server.parsl import ParslTaskServer

# Define your function
def simulate(x: float) -> float:
    return - x ** 2 + 4

# Make the Doer
doer = ParslTaskServer([simulate], queues, config)

Once these are defined, launching the application involves starting both

# Launch the Thinker and doer
doer.start()
thinker.start()

# Wait until it finishes
thinker.join()
queues.send_kill_signal()  # Stop the doer

# Done!
print(f'Answer: f({thinker.answer:.2f}) = {thinker.best_result:.2f}')

Tutorials

Visit the Quickstart to learn to build a full application.

More Examples

See the demo_apps to see a variety of ways to use Colmena.

Learning More

Our Read-the-Docs provides the most up-to-date information about Colmena.

You can also learn more about Colmena in the papers we published about it:

  • Ward et al. "Colmena: Scalable Machine-Learning-Based Steering of Ensemble Simulations for High Performance Computing". 2021 IEEE/ACM Workshop on Machine Learning in High Performance Computing Environments (MLHPC) [doi] [ArXiv] [slides] [YouTube]

Acknowledgements

This project was supported in part by the Exascale Computing Project (17-SC-20-SC) of the U.S. Department of Energy (DOE) and by DOE’s Advanced Scientific Research Office (ASCR) under contract DE-AC02-06CH11357.

colmena's People

Contributors

aymenfja avatar braceal avatar gpauloski avatar ianfoster avatar kylechard avatar ryanchard avatar tskluzac avatar wardlt avatar yadudoc avatar zhuozhaoli avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

colmena's Issues

Special-purpose agents that are called as functions from other workers

We have found at least one case where one agent changes the behavior of how another runs before triggering it. Those options are currently stored as shared state between agents, but could better expressed as function arguments to cut down on the number of "global" variables visible by all agents (it makes me nervous and leads to large init functions).

  • Add a "queue_decorator" agent
  • Add an example that illustrates how to use it

Interference between Multiple Runs

Multiple runs of Colmena on the same node may interfere with each other by passing data to/from the same Redis queues. We should make this harder to happen.

A few ideas:

  • Provide wrappers for spawning a new Redis server with a randomly-generated port
  • Generate a unique queue base names (e.g., with a UUID) when creating queue objects.

Add documentation page of "common patterns"

We see many common patterns in our Colmena steering processes where more than one type of agent are used together. We should start a documentation page where we share these patterns to aid others in thinking on how to write a Thinker.

We could start with the "task submission" / "result processor" pair

Support launching Task Server as Thread, not Process

Avoid copying between the Task Server and Thinker may outweigh the need to avoid the GIL in some cases. I have the FuncXTaskServer clearly in mind here, where the only think it does is communicate to another server.

As pointed out by @gpauloski , shared memory between the Task Server/Thinker will be especially useful as we can start resolving object proxies as soon as they are received by the task server.

We will need to implement Thread Queue support (#36) before addressing this issue.

Checkpointing of FuncX Task Server

FuncX keeps track of the state of a task in the cloud, which means that we could shutdown a task server and then re-initialize it without loosing any information. Problem is, we don't currently support this and need to work out a few things:

  • Whether we can associate a series of tasks with a single identifier (e.g., the name of a Colmena run)
  • If not, developing a mechanism to save task ids associated with a run locally

Fix FuncX task server documentation

Documentation for FuncX is absent because readthedocs doesn't install funcx. We should probably just make it a required dependency as it's no long incompatible with Parsl's more recent versions.

Value server: Removing large objects from task descriptions

Colmena applications may use large objects that are re-used between tasks (e.g., machine learning models). We could remove the need to send these objects with every task submission and open the door to other ways of reducing communication costs (e.g., collective communication between workers).

One model I have been thinking about is a method that lets client applications push objects to a store (Value Server) and then passing pointers to the data that are resolved by the method server. A basic flow would look like:

value-server

We would have a fair amount of work to do for an initial prototype:

  • Implement a value server that can be accessed by client and workers
  • Create a "data pointer" object
  • Allow method server or client to pass value server configuration to workers (or, alternatively)
  • Add logic to resolve data pointers to wrapper function. It should scan the inputs and outputs for data pointers, and then act to pull/push objects.

I think the largest opportunity for optimization is in the wrapper function. We could introduce...

  • caching and checking if a server has an updated version of the object before pulling
  • a pub/sub model to cache data objects ahead-of-time with collective communication
  • performing additional deserialization routines (e.g., compiling a tensorflow model) asynchronously. For example, see MPNNMessage's get_model function that actually builds the model from the weights and configuration stored in the message. At present, this is executed lazily but could be done eagerly when the model data is received by a worker.

Error Handling

If a task fails, it is never reported back to the client and crash the method server.

We should send them back to the client in a separate channel or at least not have them bring down the method server.

Simplify writing "tail down" at end of run

As soon as any signals "done" all threads, including those which collect results, close. It would be better have the "job collection" agents remain active until all jobs complete so that no compute is wasted.

Create a FuncXTaskServer

A funcX task server would make it easier to create multi-site Colmena applications.

After a discussion with the FuncX team others, we've identified a few main steps to building the executor:

  • Write a "read from Redis and create FuncX future" function that starts with TaskServer
  • Write a "gather completed results and push back to Redis" function that starts with TaskServer
  • Decide where to serialize inputs/results after/before passing them to Redis. Deserialization at the TaskServer would allow for using FuncX's built-in serialization library, but will result in serializing each method twice (once before sending to Redis, once before sending to web service). For reference, the Parsl task server wraps functions in a shim that performs deserialization at the worker.
  • Figure out how to capture runtime statistics. These are captured by serialization shim used in the Parsl Task server (link above)

Add resources requirements to task requests

Add another field to the Result object which stores how many resources are required for a task.

  • Modify the result object
  • Document the use of these fields. E.g., do Task Servers need to follow them exactly? Is the information accessible to functions served by the task server?
  • Add some examples for how to use this feature

Better configuration around whether inputs are autoevicted from ProxyStore

We currently default to never evicting inputs from ProxyStore, which has pros and cons.

The main con is that if you run a large number of tasks, the data store will become full of data that is no longer useful. This is a big problem if that data is stored in an Redis store in RAM. We automatically evict results from the store for this reason.

The pro is that we avoid making it harder to re-use the same input between multiple tasks and make it so that workflow engines can restart tasks without worry.

We should make it easier for users to choose their own behavior and, perhaps, make it so Colmena only autoevicts an input from a store after a task completes?

Revise names and definitions of timings

Some of our timing measurements are timestamps, others are elapsed times, and both start with time_ . Let's change the names so it's clearer which are deltas and which are timestamps.

We should review which metrics are missing while we're at it (e.g., time to submit to workflow engine)

Do not de-serialize inputs/outputs in method server

We only need to serialize or de-serialize (e.g., pickle, json) the inputs and outputs to a method at the client and worker, but currently do so at the method server as well. In short, (de)serialization happens twice as often as is necessary.

To fix:

  • Remove deserialization logic from the Method Server queues. queue.py
  • Add deserialization to the Parsl adapter function. parsl.py#22
  • While we are at it, store the serialization method in messages (to prevent it from needing to be inferred) and store the serialization times as part of the Parsl adapter function. models.py

Thread vs Process Executor for Output Tasks in Parsl

The Parsl method server has a set of workers that push completed tasks to a Redis queue. These workers are currently using Parsl's ThreadPoolExecutor, but I am concerned that keeping them as threads on the main Parsl process is interfering with our task launch rate.

The original reasoning for keeping them as threads was to reduce latency by avoiding the need to copy large results to a separate process. However, given our Parsl task creation times are down to ~500/s from the several 1000/s reported on Theta under different conditions, it may be worth reconsidering this design decision.

  • Add a configuration option to the ParslMethodServer for using process vs threads for output workers
  • Evaluate latency vs throughput tradeoff for the two options under different synthetic workloads
  • If tradeoff emerges, add documentation describe when threads or processes are better

Can we remove the Parsl "output threads"?

Parsl uses multiple types of output threads and this complexity might be unnecessary. Instead of adding a special Parsl tasks to a workflow and having a separate "scavenging" thread, could use just use a callback to write results to the queue (as in FuncXTaskServer)?

Terminology Revision

We clarified the terminology for each component in our SC paper and need to update the source code accordingly.

Better support for executable "methods"

Methods that call external MPI codes are likely to be commonly used in Colmena.

We currently require packing all pre- and post-processing steps (e.g., reading output files) into a single task with the executable, which means many nodes from a multi-node task could sit idle while pre-/post-processing are completed.

Many workflow systems adopt a model describing executing such tasks as a combination of preprocessor + executable + postprocessor subtasks (e.g., Fireworks, Balsam). We could develop a similar approach by allowing users to assign pre- and post-processing tasks with a "method" and using Parsl to execute them on different resources.

Have event-triggered actions automatically reset flag

We use event-triggered agents pretty frequently in our Colmena applications, and often find that these agents reset their flag after they complete. If there are more than 1 agent launched by an event, we also wait until all agents complete before resetting the flag. Dealing with events and barriers is something we can hide from users.

  • Store information about agent options in the attributes for the functions. This will allow us to group agents together by which event was used to trigger them
  • Add logic to create barriers shared between threads when thinker is started
  • Augment "event_triggered" decorator code to use barriers/reset flags when complete.

Convert to proxy after serialization

"Do you know if sizeof works well with tensorflow models or other weird objects? If we run into problems, can we run this "to_proxy" after serializing an object?"

Originally posted by @WardLT in #20 (comment)

Will require:

  • something along the lines of an is_serialized flag in ValueServer.get() and to_proxy()
  • Changing serialization of inputs to serialized each value in the input tuple (*args, **kwargs) so that we can check the size of each serialized object and proxy as necessary.

Allow total number of available workers to change

Many applications, especially those that span multiple sites, may have compute resources that change in availability over time.
It would be beneficial to be able to write steering policies that are responsive to such changes (e.g., ruminating on best simulations to run until workers are actually available).

To allow this we need:

  • Task servers to monitor the amount of resources available
  • Task servers and thinkers to share the resource tracker
  • Event triggering based on changes in worker availability

Value Server Quality of Life Changes

Miscellaneous QoL changes regarding comments in #20.

  • Value Server documentation in ReadTheDocs
  • Add complete arg descriptions in docstrings
  • Add debug level logging where relevant
    • when an object is proxied
    • when the value server is initialized

Serialize Path to JSON when using FileStore

We get an "unserializable type" error if you use a Path when initializing a FileStore.

  File "/home/lward/miniconda3/envs/voc/lib/python3.10/site-packages/colmena/redis/queue.py", line 397, in send_inputs
    self.outbound.put(result.json(), topic=topic)
  File "/home/lward/miniconda3/envs/voc/lib/python3.10/site-packages/colmena/models.py", line 218, in json
    return json.dumps(data, default=proxy_json_encoder)
  File "/home/lward/miniconda3/envs/voc/lib/python3.10/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/home/lward/miniconda3/envs/voc/lib/python3.10/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/home/lward/miniconda3/envs/voc/lib/python3.10/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/home/lward/miniconda3/envs/voc/lib/python3.10/site-packages/colmena/proxy.py", line 46, in proxy_json_encoder
    raise TypeError(f'Unserializable type: {type(proxy)}')
TypeError: Unserializable type: <class 'pathlib.PosixPath'>

Create proxies of exception objects

Some of the exceptions from QCEngine exceed FuncX's result size limits. Good news: the exception is just replaced with a "FuncX object size" exception. Bad news: Any of the original error is lost.

Not sure if this is going to be a widespread issue, but should be an easy fix.

CPU/GPU Affinity for Parsl Workers

Tensorflow utilizes all resources on a node unless partitioned otherwise, and I suspect other NN frameworks are similar. Given the prevalence of NNs in target Colmena applications, we should bake in ways of running >1 ML tasks per node without having them compete for resources.

Note: Adding in resource pinning for workers might be better suited for Parsl (@yadudoc ?)

Implement a Threading/Multiprocessing Queue Alternative

Standing up Redis can be overkill for exchanging data between thinker/server for many applications. Instead, we could implement the same interface using Python's native multiprocessing queues, which could simplify applications by reducing the number of services/ports to be dealt with by 1. If we implement a threading queue, we can reduce the amount of data copying too.

API bug in synthetic-data example

In the synthetic-data example a value_server_threshold parameter is passed to make_queue_pairs, however, this parameter is not part of the function signature. Should it be updated to proxystore_threshold? In addition, is it necessary to specify the proxystore_name parameter for proxystore_threshold to have an effect? Thanks!

Code:
https://github.com/exalearn/colmena/blob/master/demo_apps/synthetic-data/synthetic.py#L159

client_queues, server_queues = make_queue_pairs(
        args.redis_host,
        args.redis_port,
        topics=["generate"],
        serialization_method="pickle",
        keep_inputs=False,
        value_server_threshold=value_server_threshold,
    )

Improve the testing suite for FuncX

The FuncXExecutor has some behavior around threads we don't mock, but that caused the FuncXTaskServer to break when FuncX updated to v1. We should make our mock executor wrap the FuncXExecutor (and, perhaps, mark the futures as complete once _batch_run returns them) so that we ensure that Colmena is properly tested against FuncX

See e5dbc04 for details

Perform Deserialization on Workers

We eagerly deserialize data when it is pulled from the Redis queue, which means it serialized a second time before it gets sent to the worker. We should, instead, wait until it gets to the worker.

Create a Quick-start

Develop a simple demo to illustrate that you have the package working correctly and the key features of Colmena.

Should be build off of example from #8

Better Value Server Eviction

A standard LRU policy or similar will not work for the value server because we guarentee that any proxy that has been created will always be resolved. However, we still need some way of cleaning up old objects.

Ideas:

  • Evict results objects once they are deserialized on the client. This should work well under the assumption that the output proxies are never duplicated (the only edge case I can imagine for this is something along the lines of the user manually proxying an object in the target function and writing the proxy out to persistent storage then accessing later).
  • Have a proxy flag that says "I should only be resolved once and once I am resolved, clear me from Redis".
  • A more complicated option where we reference count the proxies associated with a key.

Remove environment variable value server initialization

Originally posted by @WardLT in #20 (comment)

A couple ideas on where we put the responsibility for initializing the value server on remote workers:

  • Give the object proxy the responsibility. The Factory object in each proxy stores the value server address/port. Factory.__call__ and Factory.async_resolve() call init_value_server(self.address, self.port).
  • run_and_record_timing calls init_value_server(address, port) using the address/port that is passed in the results object. ClientQueues already knows the value server address/port and can provide it when a new Result is created.

Two-layer Value Server Cache

Problem: A current limitation of the value server caches is that they exist on a per-worker basis. I.e. multiple workers on a single node do not share caches.

Idea: Add a second cache layer at the node-level, implemented via Python's Shared Memory or Shared Memory Managers. The node-level cache will store serialized objects while the worker-level cache stores deserialized objects. While we still have to pay deserialization costs when accessing the node-level cache, we can avoid network communication with the remote value server.

Easier Parsl Configuration for Molecular Design

Make functions that generate configurations rather than improving fully-specified configurations from moldesign/config.py. This way, we need not edit the library for different applications and run different configurations more easily.

Deadlock in if two `ReallocatorThreads` request from the same pool

I believe the source of the deadlock is this line that executes when the thread is transferring its pool of resources before exiting:

self.resource_counter.reallocate(self.gather_to, self.disperse_to,
                                 self.resource_counter.allocated_slots(self.gather_to))

If the number of slots in the "gather to" pool changes between when they are computed (i.e., self.resource_counter.allocated_slots(self.gather_to) is resolved) and when the reallocation is performed (i.e., is self.resource_counter.reallocate) the reallocation action will hang because some of the slots are no longer in its pool.

This is especially problem because the @event_responder agents wait until the resource allocator created with one event exits before responding to the next.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.