Git Product home page Git Product logo

covalent-awsbatch-plugin's Introduction

 

covalent python tests codecov apache

Covalent AWS Batch Plugin

Covalent is a Pythonic workflow tool used to execute tasks on advanced computing hardware.

This executor plugin interfaces Covalent with AWS Batch which allows tasks in a covalent workflow to be executed as AWS batch jobs.

1. Installation

To use this plugin with Covalent, simply install it using pip:

pip install covalent-awsbatch-plugin

2. Usage Example

This is an example of how a workflow can be adapted to utilize the AWS Batch Executor. Here we train a simple Support Vector Machine (SVM) model and use an existing AWS Batch Compute environment to run the train_svm electron as a batch job. We also note we require DepsPip to install the dependencies when creating the batch job.

from numpy.random import permutation
from sklearn import svm, datasets
import covalent as ct

deps_pip = ct.DepsPip(
	packages=["numpy==1.23.2", "scikit-learn==1.1.2"]
)

executor = ct.executor.AWSBatchExecutor(
    s3_bucket_name = "covalent-batch-qa-job-resources",
    batch_queue = "covalent-batch-qa-queue",
    batch_execution_role_name = "ecsTaskExecutionRole",
    batch_job_role_name = "covalent-batch-qa-job-role",
    batch_job_log_group_name = "covalent-batch-qa-log-group",
    vcpu = 2, # Number of vCPUs to allocate
    memory = 3.75, # Memory in GB to allocate
    time_limit = 300, # Time limit of job in seconds
)

# Use executor plugin to train our SVM model.
@ct.electron(
    executor=executor,
    deps_pip=deps_pip
)
def train_svm(data, C, gamma):
    X, y = data
    clf = svm.SVC(C=C, gamma=gamma)
    clf.fit(X[90:], y[90:])
    return clf

@ct.electron
def load_data():
    iris = datasets.load_iris()
    perm = permutation(iris.target.size)
    iris.data = iris.data[perm]
    iris.target = iris.target[perm]
    return iris.data, iris.target

@ct.electron
def score_svm(data, clf):
    X_test, y_test = data
    return clf.score(
    	X_test[:90],
	 	y_test[:90]
    )

@ct.lattice
def run_experiment(C=1.0, gamma=0.7):
    data = load_data()
    clf = train_svm(
    	data=data,
    	C=C,
    	gamma=gamma
    )
    score = score_svm(
    	data=data,
	 	clf=clf
    )
    return score

# Dispatch the workflow
dispatch_id = ct.dispatch(run_experiment)(
	C=1.0,
	gamma=0.7
)

# Wait for our result and get result value
result = ct.get_result(dispatch_id=dispatch_id, wait=True).result

print(result)

During the execution of the workflow one can navigate to the UI to see the status of the workflow, once completed however the above script should also output a value with the score of our model.

0.9777777777777777

3. Configuration

There are many configuration options that can be passed in to the class ct.executor.AWSBatchExecutor or by modifying the covalent config file under the section [executors.awsbatch]

For more information about all of the possible configuration values visit our read the docs (RTD) guide for this plugin.

4. Required AWS Resources

In order to run your workflows with covalent there are a few notable AWS resources that need to be provisioned first.

For more information regarding which cloud resources need to be provisioned visit our read the docs (RTD) guide for this plugin.

The required AWS resources include a Batch Job Definition, Batch Job Role, Batch Queue, Batch Compute Environment, Log Group, Subnet, VPC, and an S3 Bucket.

Getting Started with Covalent

For more information on how to get started with Covalent, check out the project homepage and the official documentation.

Release Notes

Release notes for this plugin are available in the Changelog.

Citation

Please use the following citation in any publications:

W. J. Cunningham, S. K. Radha, F. Hasan, J. Kanem, S. W. Neagle, and S. Sanand. Covalent. Zenodo, 2022. https://doi.org/10.5281/zenodo.5903364

License

Covalent is licensed under the Apache License 2.0. See the LICENSE file or contact the support team for more details.

covalent-awsbatch-plugin's People

Contributors

alejandroesquivel avatar araghukas avatar filipbolt avatar fyzhsn avatar jkanem avatar kessler-frost avatar mpvgithub avatar pre-commit-ci[bot] avatar scottwn avatar venkatbala avatar wingcode avatar wjcunningham7 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

covalent-awsbatch-plugin's Issues

Update AWS Batch Executor to use AWSExecutor

Acceptance Criteria

  • Pass any required attributes from credentials_file, region, s3_bucket_name, profile, execution_role, and log_group_name to AWSExecutor super class
  • Re-structure executor in order to accord with abstract class methods as defined in RemoteExecutor, namely _upload_task, submit_task, get_status, _poll_task, _query_result, and cancel when appropriate
  • Update unit tests for executor
  • Update doc strings for executor

Prepare the batch executor for release

  • The batch executor should assume that infrastructure already exists. Infrastructure provisioning does not need to happen in this executor.
  • Test the existing executor and figure out what enhancements, fixes, tests, and docs need to be done for release.

Adding cleanup mechanism

Once the execution of a function finishes using this executor, there doesn't exist a mechanism to delete all the intermediate files created on remote and locally. We need a cleanup mechanism to take care of that similar to some of the other executors like SSH, AWSLambda, etc.

Update Batch executor to return stdout, stderr, and runtime exceptions

The purpose of this issue is to ensure that the Batch executor

  • returns any stdout and stderr printed by tasks using the mechanism defined in #1380. To do this, the run() implementation needs to
    • retrieves the stdout and stderr from the executor backend -- in the case of AWS Executors, by parsing Cloudwatch logs (see how Braket does this)
    • printing those strings to self.task_stdout and self.task_stderr.
  • distinguish task runtime exceptions -- those raised in the executor backend -- from other exceptions originating from the interaction of Covalent and the executor backend. This is explained in more detail in #1390. When an runtime exception occurs, the run() implementation should:
    1. Retrieve whatever stdout and stderr messages have already been printed by the task
    2. Ensure that the exception traceback is appended to the task's stderr.
    3. Print stdout and stderr to self.task_stdout and self.task_stderr, respectively.
    4. Raise a TaskRuntimeException.
      For examples, see how the dask executor now deals with task runtime exceptions.

Note: These changes should be implemented in a backward-compatible manner -- so that the new AWSExecutors should work with Covalent 0.202.0post1 (the current stable AWSExecutors work with the latest develop).

Acceptance criteria:

  • Any stdout and stderr printed by a task before raising an unhandled exception is retrieved and printed to self.task_stdout and self.task_stderr respectively, where self is the executor plugin instance.
  • If a task raises an exception:
    • The traceback is included in the task’s stderr.
    • The run() method raises a TaskRuntimeError.
  • The executor plugin remains compatible with Covalent Core `0.202.0post1.

Executor fails in some cases

For the most part, the executor seems to be running fine but there are some cases where the workflow fails. One of these cases is:

import numpy as np

import covalent as ct
from  covalent_awsbatch_plugin.awsbatch import AWSBatchExecutor

N = 1
STALL = 1e5
X = 20

executor = AWSBatchExecutor()

@ct.electron(executor=executor)
def matrix_workload(mat_1, mat_2, stall=STALL, x=X):

    i = 0
    while i < stall:
        x * x
        i += 1

    return mat_1

@ct.lattice
def inflate(mat_1, dim=3):
    mat_2 = np.random.default_rng().integers(3, size=(dim, dim))
    return matrix_workload(mat_1, mat_2)

mat_1 = np.random.default_rng().integers(10, size=(3, 3))
dispatch_id = ct.dispatch(inflate)(mat_1)

Not sure why it fails but the status returned in the logs is FAILED.

README Update: Hello world + Config table + Required cloud resources

Overview

  • The idea is to update the Readme to contain more information about required/optional config values, and provide information about each cloud resource that needs to be provisioned prior to running a live example
  • A more complete example (not just containing one task) but also a workflow

Move IaC to plugin repo & Add Pydantic validation models

Overview

The changes required to the plugin repo are minimal. We assets/infra folder where the terraform files are stored. For example, the repo structure would look like this:

- .github
- <PLUGIN_FOLDER>
    - assets
        - **infra**
            - main.tf
            - ...

Also we would need to add pydantic classes for validation of infra and executor arguments, for example:

class ExecutorPluginDefaults(BaseModel):
    """
    Default configuration values for the executor
    """

    credentials: str = ""
    profile: str = ""
    region: str = "us-east-1"
    ...
    retry_attempts: int = 3
    time_limit: int = 300
    poll_freq: int = 10

class ExecutorInfraDefaults(BaseModel):
    """
    Configuration values for provisioning AWS Batch cloud  infrastructure
    """
    prefix: str
    aws_region: str = "us-east-1"
    ...
    credentials: Optional[str] = ""
    profile: Optional[str] = ""
    retry_attempts: Optional[int] = 3
    time_limit: Optional[int] = 300
    poll_freq: Optional[int] = 10


_EXECUTOR_PLUGIN_DEFAULTS = ExecutorPluginDefaults().dict()

EXECUTOR_PLUGIN_NAME = "<PLUGIN_NAME>"

Acceptance Criteria

  • Move infra to repo
  • Create pydantic validation models

Support for empty region values

Description

As a part of a recent change for self-hosted support, the base AWSExecutor class is solely responsible for resolving default values for profile, credentials file, and region instead of having individual aws executors determine defaults. As a result the region argument can be left unspecified as the AWSExecutor will resolve this via boto3's default behavior (to check AWS_DEFAULT_REGION environment variable).

However there is a need to explicitly obtain the region for log configuration, even if the region attribute may be empty in the Executor & AWSExecutor. We then need a robust way to obtain the region without duplicating the logic boto3 may use to resolve the region.

Proposed solution: we must use the region directly supplied by a boto3 session

Fix tests for Python 3.10

  • Update the CI so that tests are run using the most recent Covalent prerelease, rather than the stable release
  • Re-add Python 3.10 to the list of automated test environments

Support task cancellation

Description

Covalent now supports dispatch cancellation. Executor plugins can opt in to this functionality by registering a job identifier for each task (in this case the job id or arn) and implementing a cancel method. This method will be invoked by Covalent when the user requests that the task be cancelled.

Methods provided by Covalent

All executor plugins automatically implement the following methods to be invoked in the plugin's run() method.

async def set_job_handle(self, job_handle)

saves a job identifier (job_handle) in Covalent's database. The job_handle can be any JSONable type, typically a string or integer, and should contain whatever information is needed to cancel the job later. For instance, AWS Batch's cancel_job method expects the job's jobId.

async def get_cancel_requested(self) -> bool

queries Covalent if task cancellation has been requested. This can be called at various junctures in the run() method if desired.

The run() method should raise a TaskCancelledError exception upon ascertaining that the backend job has been cancelled.

Methods to be implemented by each plugin:

Each plugin defines the following abstract method to be overridden with backend-specific logic:

async def cancel(self, task_metadata: Dict, job_handle: str) -> bool`

Upon receiving a cancellation request, the Covalent server will invoke this with the following inputs:

  • task_metadata: currently contains the keys dispatch_id and node_id as for the run() method.
  • job_handle: the task's job identifier previously saved using set_job_handle().

In addition to querying Covalent for cancellation requests, the run() method may need to query the compute backend to determine whether the job is in fact cancelled and raise TaskCancelledError if that is the case.

The code below sketches how to use the above methods:

from covalent._shared_files.exceptions import TaskCancelledError

...
async def proceed_if_task_not_cancelled(self):
  if await self.get_cancel_requested():
     self._debug_log(f"Task Cancelled")
     raise TaskCancelledError(f"Batch job {batch_job_name} cancelled")

async def run(self, function: Callable, args: List, kwargs: Dict, task_metadata: Dict) -> Any:
    ...
    await self.proceed_if_task_not_cancelled()
    # upload pickled assets
    ...
    await self.proceed_if_task_not_cancelled()
    # invoke job/task
    await self.set_job_handle(handle=job_handle)
    ...
    # await self.poll_job_status(task_metadata, job_handle)

async def poll_job_status(self, task_metadata, job_id):
    # Boto3 client invocations to check the job status
    # while timeout_not_exceeded:
    #    job_state = client.describe_job(job_id)    
    #    if job_state == "CANCELLED":
    #        raise TaskCancelledError
    #    ...
    #    await asyncio.sleep(poll_freq)
        
async def cancel(self, task_metadata: Dict, job_handle: str) -> None:
  """
    Cancel the batch job
    
    Arg(s)
      task_metadata: Dictionary with the task's dispatch_id and node id
      job_handle: Unique job handle assigned to the task by Batch
  
    Return(s)
      True/False indicating if the job was cancelled
  """
   # boto client invocations to cancel the task
   ...
    if job_cancelled:
        return True
    else:
        # Job cannot be cancelled for one reason or another
        return False

Acceptance Criteria

  • In the run() method:
    • Save the job handle for the task once that is determined.
    • Check whether the job has been cancelled at various junctures
  • Implement cancel method
  • Ensure that the a workflow is tested with cancellation to ensure cancel functionality correctly integrated

Revise AWSBatch infra vs. plugin help menu and defaults

Relevant PR: #88

Issue

This issue concerns the output of covalent deploy up awsbatch --help

Recommended Tasks

Consider also, changes to Covalent OS:

  • distinguish/omit user-settable vs not parameters in/from help table; some param defaults a uniquely auto-generated based on the prefix (see relevant .tf files)

AWS Batch resource deployment UX improvements

See here for notes.

  • The following infra defaults are not used for resource creation:
vcpus
memory
num_gpus
retry_attempts
time_limit
poll_freq
container_image_uri
  • Empty defaults in the help menu should be replaced with
credentials: str = str(Path.home() / ".aws/credentials")
profile: str = "default"
...
vpc_id: str = "default"
subnet_id: str = "default"
  • The parameter named vcpu here is named vcpus in GCP Batch. Neither should be an infra parameter, else names should be resolved.

  • The following names conflict across the plugin and infra defaults

ExecutorPluginDefault ExecutorInfraDefaults
region aws_region
s3_bucket_name aws_s3_bucket
batch_queue aws_batch_queue

Tasks

GCP Batch Plugin Functionality

Acceptance Criteria

  • Create a file covalent_googlebatch_plugin/gcpbatch.py
  • Create the executor_plugin_name variable according to covalent_gcpbatch_plugin
  • The plugin GCPBatchExecutor should be importable with from covalent_gcpbatch_plugin.gcpbatch import GCPBatchExecutor
  • Add _EXECUTOR_PLUGIN_DEFAULTS with the values provided below
  • Implement the executor by using the google-cloud-batch & google-cloud-storage GCP python client libraries. The following guide can be used as a reference for the necessary objects required for the implementation
  • The executor methods outlined below should be implemented, as used a a reference (function names can change based on issue owner's discretion)

Note: Only implementation is in scope for this issue, unit/functional tests will come later.

Expected executor methods:

  • __init__()
  • run (async)
  • _create_job (async)
  • _upload_to_bucket(async)
  • _create_job_sync
  • _pickle_func(async)
  • _pickle_func_sync
  • _submit_job (async)
  • get_status (async)
  • _poll_task (async)
  • cancel (async)
  • _query_result (async)
  • _get_service_client

Expected executor plugin defaults

_EXECUTOR_PLUGIN_DEFAULTS = {
	"storage_bucket_name": "",
	"image_uri": "",
	"service_account_email": "",
	"retries": 3,
	"time_limit": 300,
	"cache_dir": "/tmp/covalent",
	"poll_freq": 5,
	"vcpus": 1,
	"memory": 256
}

Update electron statuses for `AWSBatchExecutor`

Once separation of workflow and electron statuses is done, the electron level statuses need to be updated to accommodate executor dependent statuses. In this case the following status definitions will be updated:

  • REGISTERING - Uploading task and registering the Batch job definition

  • PROVISIONING - Corresponds to the Batch jobs state Submitted

  • PENDING_BACKEND - Corresponds to Batch job states Pending and Runnable

  • STARTING - Corresponds to Batch job state Starting

  • RUNNING - Corresponds to Batch job state Running

  • DEPROVISIONING - Corresponds to the Batch job states Deactivating, Stopping, and Deprovisioning

  • COMPLETING - Batch job is in the Stopped state, results files being retrieved, temporary files are being deleted

For the end state, there is a separate class called EndState and contains all possible end state statuses like COMPLETED, FAILED, etc. and thus does not need to be added to the executor dependent status.

Acceptance Criteria:

  • Above mentioned statuses need to be updated inside the local executor
  • Tests need to be added to verify if those definitions are as expected

prescriptive guidance document for QA

Spend no more than 30 minutes on this task

Write an explicit list of instructions for how this executor should be validated for QA. Assume the QA engineer has not used the executor before (but otherwise is familiar with Covalent as an Agnostiq engineer).

The list of instructions should be posted as a comment on this issue. Mention @wjcunningham7 in the comment.

If such instructions already exist in a README or RTD, you can instead link to those instructions.

Acceptance Criteria

  • An explicit list of instructions, or a link to instructions, for how to validate the executor for QA

Non async network and disk I/O blocking main thread

Description

As the executors are intended to be async-aware they are expected to use asynchronous calls for i/o bound operations, currently the official AWS SDK client boto3 only performs synchronous / blocking calls when interacting with Amazon services.

As this executor utilizes boto3 for making requests to AWS services (such as uploading function files, ect.) it currently blocks the main thread affecting parallelization and increasing latency to the covalent server (also viewing the UI). Hence we will need to leverage other client libs that support asyncio like aioboto3 and extend them if necessary. Since these don't seem to be mature at yet, we should use the more conservative approach of offloading all boto3 calls, such as this one, to a thread pool.

Disk I/O (including serializing inputs) can also block and therefore should also be performed in a separate thread.

Acceptance criteria

  • All boto3 calls are delegated to the default threadpool executor using asyncio().get_running_loop().run_in_executor(None, ...).
  • All disk I/O is delegated to the default threadpool executor

Create a base Docker image for cloud execution

Consider as an example the Docker script in the covalent-awsbatch-plugin repo.

DOCKER_SCRIPT = """
FROM amd64/python:3.8-slim-buster
RUN apt-get update && apt-get install -y \\
gcc \\
&& rm -rf /var/lib/apt/lists/*
RUN pip install --no-cache-dir --use-feature=in-tree-build boto3 cloudpickle
RUN pip install covalent --pre
WORKDIR {docker_working_dir}
COPY {func_basename} {docker_working_dir}
ENTRYPOINT [ "python" ]
CMD ["{docker_working_dir}/{func_basename}"]
"""

There is a similar (or identical) script in the Braket, Lambda, and ECS plugins.

This script is used to build a Docker image locally and upload it to ECR to execute the task on the cloud.

dockerfile = self._format_dockerfile(exec_script_file.name, docker_working_dir)
dockerfile_file.write(dockerfile)
dockerfile_file.flush()
app_log.debug("AWS BATCH EXECUTOR: WRITE DOCKERFILE SUCCESS")
local_dockerfile = os.path.join(task_results_dir, f"Dockerfile_{image_tag}")
shutil.copyfile(dockerfile_file.name, local_dockerfile)
# Build the Docker image
docker_client = docker.from_env()
image, build_log = docker_client.images.build(
path=self.cache_dir,
dockerfile=dockerfile_file.name,
tag=image_tag,
platform="linux/amd64",
)
app_log.debug("AWS BATCH EXECUTOR: DOCKER BUILD SUCCESS")
ecr_username = "AWS"
ecr_password, ecr_registry, ecr_repo_uri = self._get_ecr_info(image_tag)
app_log.debug("AWS BATCH EXECUTOR: ECR INFO RETRIEVAL SUCCESS")
docker_client.login(username=ecr_username, password=ecr_password, registry=ecr_registry)
app_log.debug("AWS BATCH EXECUTOR: DOCKER CLIENT LOGIN SUCCESS")
# Tag the image
image.tag(ecr_repo_uri, tag=image_tag)
app_log.debug("AWS BATCH EXECUTOR: IMAGE TAG SUCCESS")
try:
response = docker_client.images.push(ecr_repo_uri, tag=image_tag)
app_log.debug(f"AWS BATCH EXECUTOR: DOCKER IMAGE PUSH SUCCESS {response}")
except Exception as e:
app_log.debug(f"{e}")

We want to standardize and optimize this procedure. Instead of building the Docker image locally for each plugin, we will host a common base image on ECR that the plugins can pull.

Update readme

Acceptance criteria:

  • Ensure that the instructions in the Readme are correct.
  • Double check that the permissions instructions are are correct.
  • Check that the links go to the correct place.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.