Git Product home page Git Product logo

covalent-ecs-plugin's People

Contributors

alejandroesquivel avatar emmanuel289 avatar fyzhsn avatar jkanem avatar mpvgithub avatar scottwn avatar venkatbala avatar wjcunningham7 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

covalent-ecs-plugin's Issues

Update ECS executor to return stdout, stderr, and runtime exceptions

The purpose of this issue is to ensure that the ECS executor

  • returns any stdout and stderr printed by tasks using the mechanism defined in #1380. To do this, the run() implementation needs to
    • retrieves the stdout and stderr from the executor backend -- in the case of AWS Executors, by parsing Cloudwatch logs (see how Braket does this)
    • printing those strings to self.task_stdout and self.task_stderr.
  • distinguish task runtime exceptions -- those raised in the executor backend -- from other exceptions originating from the interaction of Covalent and the executor backend. This is explained in more detail in #1390. When an runtime exception occurs, the run() implementation should:
    1. Retrieve whatever stdout and stderr messages have already been printed by the task
    2. Ensure that the exception traceback is appended to the task's stderr.
    3. Print stdout and stderr to self.task_stdout and self.task_stderr, respectively.
    4. Raise a TaskRuntimeException.
      For examples, see how the dask executor now deals with task runtime exceptions.

Note: These changes should be implemented in a backward-compatible manner -- so that the new AWSExecutors should work with Covalent 0.202.0post1 (the current stable AWSExecutors work with the latest develop).

Acceptance criteria:

  • Any stdout and stderr printed by a task before raising an unhandled exception is retrieved and printed to self.task_stdout and self.task_stderr respectively, where self is the executor plugin instance.
  • If a task raises an exception:
    • The traceback is included in the task’s stderr.
    • The run() method raises a TaskRuntimeError.
  • The executor plugin remains compatible with Covalent Core `0.202.0post1.

Update electron statuses for `AWSECSExecutor`

Once separation of workflow and electron statuses is done, the electron level statuses need to be updated to accommodate executor dependent statuses. In this case the following status definitions will be updated:

  • REGISTERING - Uploading task and registering the ECS task definition

  • PROVISIONING - Corresponds to the ECS task state Provisioning

  • PENDING_BACKEND - Corresponds to ECS task states Pending and Activating

  • RUNNING - Corresponds to ECS task state Running

  • DEPROVISIONING - Corresponds to ECS task states Deactivating, Stopping, and Deprovisioning

  • COMPLETING - ECS task is in the Stopped state, result files being retrieved, temporary files being deleted

For the end state, there is a separate class called EndState and contains all possible end state statuses like COMPLETED, FAILED, etc. and thus does not need to be added to the executor dependent status.

Acceptance Criteria:

  • Above mentioned statuses need to be updated inside the local executor
  • Tests need to be added to verify if those definitions are as expected

Issue downloading result pickle file after execution

After dispatching a basic (iris dataset) classical SVM example I seem only to get a successful dispatch sometimes while the remaining attempts fail due to the ecs plugin not being able to download the result object.

[2022-08-18 16:34:53,772] [ERROR] execution.py: Line 371 in _run_task: Exception occurred when running task 0: An error occurred (400) when calling the HeadObject operation: Bad Request
[2022-08-18 16:34:53,773] [ERROR] execution.py: Line 379 in _run_task: Run task exception
Traceback (most recent call last):
  File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/covalent_dispatcher/_core/execution.py", line 355, in _run_task
    output, stdout, stderr = await loop.run_in_executor(tasks_pool, execute_callable)
  File "/Users/alejandro/.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/covalent_ecs_plugin/ecs.py", line 265, in execute
    return self._query_result(result_filename, task_results_dir, task_arn, image_tag)
  File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/covalent_ecs_plugin/ecs.py", line 497, in _query_result
    s3.download_file(self.s3_bucket_name, result_filename, local_result_filename)
  File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/boto3/s3/inject.py", line 190, in download_file
    return transfer.download_file(
  File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/boto3/s3/transfer.py", line 320, in download_file
    future.result()
  File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/s3transfer/futures.py", line 103, in result
    return self._coordinator.result()
  File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/s3transfer/futures.py", line 266, in result
    raise self._exception
  File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/s3transfer/tasks.py", line 269, in _main
    self._submit(transfer_future=transfer_future, **kwargs)
  File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/s3transfer/download.py", line 354, in _submit
    response = client.head_object(
  File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/botocore/client.py", line 508, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/botocore/client.py", line 915, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (400) when calling the HeadObject operation: Bad Request

However when manually downloading the produced result.pkl file in the S3 bucket, it seems that the result was successfully written and the contents contained the expected output of the task.

So seems to be limited to when the executor is attempting to download the result file.

Prepare the ECS executor for release

  • The ECS executor should assume that infrastructure already exists. Infrastructure provisioning does not need to happen in this executor.
  • Test the existing executor and figure out what enhancements, fixes, tests, and docs need to be done for release.

Move IaC to plugin repo & Add Pydantic validation models

Overview

The changes required to the plugin repo are minimal. We assets/infra folder where the terraform files are stored. For example, the repo structure would look like this:

- .github
- <PLUGIN_FOLDER>
    - assets
        - **infra**
            - main.tf
            - ...

Also we would need to add pydantic classes for validation of infra and executor arguments, for example:

class ExecutorPluginDefaults(BaseModel):
    """
    Default configuration values for the executor
    """

    credentials: str = ""
    profile: str = ""
    region: str = "us-east-1"
    ...
    retry_attempts: int = 3
    time_limit: int = 300
    poll_freq: int = 10

class ExecutorInfraDefaults(BaseModel):
    """
    Configuration values for provisioning AWS Batch cloud  infrastructure
    """
    prefix: str
    aws_region: str = "us-east-1"
    ...
    credentials: Optional[str] = ""
    profile: Optional[str] = ""
    retry_attempts: Optional[int] = 3
    time_limit: Optional[int] = 300
    poll_freq: Optional[int] = 10


_EXECUTOR_PLUGIN_DEFAULTS = ExecutorPluginDefaults().dict()

EXECUTOR_PLUGIN_NAME = "<PLUGIN_NAME>"

Acceptance Criteria

  • Move infra to repo
  • Create pydantic validation models

prescriptive guidance document for QA

Spend no more than 30 minutes on this task

Write an explicit list of instructions for how this executor should be validated for QA. Assume the QA engineer has not used the executor before (but otherwise is familiar with Covalent as an Agnostiq engineer).

The list of instructions should be posted as a comment on this issue. Mention @wjcunningham7 in the comment.

If such instructions already exist in a README or RTD, you can instead link to those instructions.

Acceptance Criteria

  • An explicit list of instructions, or a link to instructions, for how to validate the executor for QA

Executor not using config file values as defaults

Description

When instantiating the executor class the executor does not use defaults from the config file if certain keys are not specified.

For example:

executor = ct.executor.ECSExecutor(
    vcpu=1,
    memory=2,
    ecs_task_subnet_id="subnet-871545e1",
    ecs_task_security_group_id="sg-0043541a"
)

as specified in the README would not work as credentials_file does not read from the default defined in the config file.

Non async network and file i/o blocking main thread

Description

As the executors are intended to be async-aware they are expected to use asynchronous calls for i/o bound operations, currently the official AWS SDK client boto3 only performs synchronous / blocking calls when interacting with Amazon services.

As this executor utilizes boto3 for making requests to AWS services (such as uploading function files, ect.) it currently blocks the main thread affecting parallelization and increasing latency to the covalent server (also viewing the UI). Hence we will need to leverage other client libs that support asyncio like aioboto3 and extend them if necessary.

Disk I/O (including serializing inputs) can also block and therefore should also be performed in a separate thread.

Acceptance criteria

  • All boto3 calls are delegated to the default threadpool executor using asyncio().get_running_loop().run_in_executor(None, ...).
  • All disk I/O is delegated to the default threadpool executor

Support task cancellation

Description

Covalent now supports dispatch cancellation. Executor plugins can opt in to this functionality by registering a job identifier for each task (in this case the job id or ARN) and implementing a cancel method. This method will be invoked by Covalent when the user requests that the task be cancelled.

Methods provided by Covalent

All executor plugins automatically implement the following methods to be invoked in the plugin's run() method.

async def set_job_handle(self, job_handle)

saves a job identifier (job_handle) in Covalent's database. The job_handle can be any JSONable type, typically a string or integer, and should contain whatever information is needed to cancel the job later. In this case, the ECS stop_task method expects the task id assigned by ECS.

async def get_cancel_requested(self) -> bool

queries Covalent if task cancellation has been requested. This can be called at various junctures in the run() method if desired.

The run() method should raise a TaskCancelledError exception upon ascertaining that the backend job has been cancelled.

Methods to be implemented by each plugin:

Each plugin defines the following abstract method to be overridden with backend-specific logic:

async def cancel(self, task_metadata: Dict, job_handle: str) -> bool`

Upon receiving a cancellation request, the Covalent server will invoke this with the following inputs:

  • task_metadata: currently contains the keys dispatch_id and node_id as for the run() method.
  • job_handle: the task's job identifier previously saved using set_job_handle().

In addition to querying Covalent for cancellation requests, the run() method may need to query the compute backend to determine whether the job is in fact cancelled and raise TaskCancelledError if that is the case.

The code below provides a rough guide to using the above methods

from covalent._shared_files.exceptions import TaskCancelledError

...
async def proceed_if_task_not_cancelled(self):
  if await self.get_cancel_requested():
     self._debug_log(f"Task Cancelled")
     raise TaskCancelledError(f"Batch job {batch_job_name} cancelled")

async def run(self, function: Callable, args: List, kwargs: Dict, task_metadata: Dict) -> Any:
    ...
    await self.proceed_if_task_not_cancelled()
    # upload pickled assets
    ...
    await self.proceed_if_task_not_cancelled()
    # invoke job/task
    await self.set_job_handle(handle=job_handle)
    ...
    # await self.poll_job_status(task_metadata, job_handle)

async def poll_job_status(self, task_metadata, job_id):
    # Boto3 client invocations to check the job status
    # while timeout_not_exceeded:
    #    job_state = client.describe_job(job_id)    
    #    if job_state == "CANCELLED":
    #        raise TaskCancelledError
    #    ...
    #    await asyncio.sleep(poll_freq)
        
async def cancel(self, task_metadata: Dict, job_handle: str) -> None:
  """
    Cancel the batch job
    
    Arg(s)
      task_metadata: Dictionary with the task's dispatch_id and node id
      job_handle: Unique job handle assigned to the task by Batch
  
    Return(s)
      True/False indicating if the job was cancelled
  """
   # boto client invocations to cancel the task
   ...
    if job_cancelled:
        return True
    else:
        # Job cannot be cancelled for one reason or another
        return False

Acceptance Criteria

  • In the run() method:
    • Save the job handle for the task once that is determined.
    • Check whether the job has been cancelled at various junctures
  • Implement cancel method
  • Ensure that the a workflow is tested with cancellation to ensure cancel functionality correctly integrated

Update AWS ECS Executor to use AWSExecutor

Acceptance Criteria

  • Pass any required attributes from credentials_file, region, s3_bucket_name, profile, execution_role, and log_group_name to AWSExecutor super class
  • Re-structure executor in order to accord with abstract class methods as defined in RemoteExecutor, namely _upload_task, submit_task, get_status, _poll_task, _query_result, and cancel when appropriate
  • Update unit tests for executor
  • Update doc strings for executor

Support for empty region values

Description

As a part of a recent change for self-hosted support, the base AWSExecutor class is solely responsible for resolving default values for profile, credentials file, and region instead of having individual aws executors determine defaults. As a result the region argument can be left unspecified as the AWSExecutor will resolve this via boto3's default behavior (to check AWS_DEFAULT_REGION environment variable).

However there is a need to explicitly obtain the region for log configuration, even if the region attribute may be empty in the Executor & AWSExecutor. We then need a robust way to obtain the region without duplicating the logic boto3 may use to resolve the region.

Proposed solution: we must use the region directly supplied by a boto3 session

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.