agnostiqhq / covalent-ecs-plugin Goto Github PK
View Code? Open in Web Editor NEWExecutor plugin interfacing Covalent with Amazon ECS Fargate
Home Page: https://agnostiq.ai/covalent
License: Apache License 2.0
Executor plugin interfacing Covalent with Amazon ECS Fargate
Home Page: https://agnostiq.ai/covalent
License: Apache License 2.0
The purpose of this issue is to ensure that the ECS executor
run()
implementation needs to
stdout
and stderr
from the executor backend -- in the case of AWS Executors, by parsing Cloudwatch logs (see how Braket does this)self.task_stdout
and self.task_stderr
.run()
implementation should:
stdout
and stderr
messages have already been printed by the taskstderr
.stdout
and stderr
to self.task_stdout
and self.task_stderr
, respectively.TaskRuntimeException
.Note: These changes should be implemented in a backward-compatible manner -- so that the new AWSExecutors should work with Covalent 0.202.0post1 (the current stable AWSExecutors work with the latest develop).
self.task_stdout
and self.task_stderr
respectively, where self
is the executor plugin instance.run()
method raises a TaskRuntimeError
.Once separation of workflow and electron statuses is done, the electron level statuses need to be updated to accommodate executor dependent statuses. In this case the following status definitions will be updated:
REGISTERING
- Uploading task and registering the ECS task definition
PROVISIONING
- Corresponds to the ECS task state Provisioning
PENDING_BACKEND
- Corresponds to ECS task states Pending
and Activating
RUNNING
- Corresponds to ECS task state Running
DEPROVISIONING
- Corresponds to ECS task states Deactivating
, Stopping
, and Deprovisioning
COMPLETING
- ECS task is in the Stopped
state, result files being retrieved, temporary files being deleted
For the end state, there is a separate class called EndState
and contains all possible end state statuses like COMPLETED
, FAILED
, etc. and thus does not need to be added to the executor dependent status.
Acceptance Criteria:
After dispatching a basic (iris dataset) classical SVM example I seem only to get a successful dispatch sometimes while the remaining attempts fail due to the ecs plugin not being able to download the result object.
[2022-08-18 16:34:53,772] [ERROR] execution.py: Line 371 in _run_task: Exception occurred when running task 0: An error occurred (400) when calling the HeadObject operation: Bad Request
[2022-08-18 16:34:53,773] [ERROR] execution.py: Line 379 in _run_task: Run task exception
Traceback (most recent call last):
File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/covalent_dispatcher/_core/execution.py", line 355, in _run_task
output, stdout, stderr = await loop.run_in_executor(tasks_pool, execute_callable)
File "/Users/alejandro/.pyenv/versions/3.8.13/lib/python3.8/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/covalent_ecs_plugin/ecs.py", line 265, in execute
return self._query_result(result_filename, task_results_dir, task_arn, image_tag)
File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/covalent_ecs_plugin/ecs.py", line 497, in _query_result
s3.download_file(self.s3_bucket_name, result_filename, local_result_filename)
File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/boto3/s3/inject.py", line 190, in download_file
return transfer.download_file(
File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/boto3/s3/transfer.py", line 320, in download_file
future.result()
File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/s3transfer/futures.py", line 103, in result
return self._coordinator.result()
File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/s3transfer/futures.py", line 266, in result
raise self._exception
File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/s3transfer/tasks.py", line 269, in _main
self._submit(transfer_future=transfer_future, **kwargs)
File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/s3transfer/download.py", line 354, in _submit
response = client.head_object(
File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/botocore/client.py", line 508, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/Users/alejandro/.pyenv/versions/qa/lib/python3.8/site-packages/botocore/client.py", line 915, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (400) when calling the HeadObject operation: Bad Request
However when manually downloading the produced result.pkl file in the S3 bucket, it seems that the result was successfully written and the contents contained the expected output of the task.
So seems to be limited to when the executor is attempting to download the result file.
When QA testing the ECS executor in https://app.zenhub.com/workspaces/executors-62e0403830817d00118788a4/issues/agnostiqhq/covalent-staging/594, errors in electron execution were properly logged in CloudWatch events, but these logs were not redirected to covalent_ui.log
. Adding those logs would be super-helpful in debugging on the client side.
The changes required to the plugin repo are minimal. We assets/infra
folder where the terraform files are stored. For example, the repo structure would look like this:
- .github
- <PLUGIN_FOLDER>
- assets
- **infra**
- main.tf
- ...
Also we would need to add pydantic classes for validation of infra and executor arguments, for example:
class ExecutorPluginDefaults(BaseModel):
"""
Default configuration values for the executor
"""
credentials: str = ""
profile: str = ""
region: str = "us-east-1"
...
retry_attempts: int = 3
time_limit: int = 300
poll_freq: int = 10
class ExecutorInfraDefaults(BaseModel):
"""
Configuration values for provisioning AWS Batch cloud infrastructure
"""
prefix: str
aws_region: str = "us-east-1"
...
credentials: Optional[str] = ""
profile: Optional[str] = ""
retry_attempts: Optional[int] = 3
time_limit: Optional[int] = 300
poll_freq: Optional[int] = 10
_EXECUTOR_PLUGIN_DEFAULTS = ExecutorPluginDefaults().dict()
EXECUTOR_PLUGIN_NAME = "<PLUGIN_NAME>"
Spend no more than 30 minutes on this task
Write an explicit list of instructions for how this executor should be validated for QA. Assume the QA engineer has not used the executor before (but otherwise is familiar with Covalent as an Agnostiq engineer).
The list of instructions should be posted as a comment on this issue. Mention @wjcunningham7 in the comment.
If such instructions already exist in a README or RTD, you can instead link to those instructions.
Acceptance Criteria
When instantiating the executor class the executor does not use defaults from the config file if certain keys are not specified.
For example:
executor = ct.executor.ECSExecutor(
vcpu=1,
memory=2,
ecs_task_subnet_id="subnet-871545e1",
ecs_task_security_group_id="sg-0043541a"
)
as specified in the README would not work as credentials_file
does not read from the default defined in the config file.
As the executors are intended to be async-aware they are expected to use asynchronous calls for i/o bound operations, currently the official AWS SDK client boto3
only performs synchronous / blocking calls when interacting with Amazon services.
As this executor utilizes boto3
for making requests to AWS services (such as uploading function files, ect.) it currently blocks the main thread affecting parallelization and increasing latency to the covalent server (also viewing the UI). Hence we will need to leverage other client libs that support asyncio like aioboto3
and extend them if necessary.
Disk I/O (including serializing inputs) can also block and therefore should also be performed in a separate thread.
boto3
calls are delegated to the default threadpool executor using asyncio().get_running_loop().run_in_executor(None, ...)
.Covalent now supports dispatch cancellation. Executor plugins can opt in to this functionality by registering a job identifier for each task (in this case the job id or ARN) and implementing a cancel
method. This method will be invoked by Covalent when the user requests that the task be cancelled.
All executor plugins automatically implement the following methods to be invoked in the plugin's run()
method.
async def set_job_handle(self, job_handle)
saves a job identifier (job_handle
) in Covalent's database. The job_handle
can be any JSONable type, typically a string or integer, and should contain whatever information is needed to cancel the job later. In this case, the ECS stop_task
method expects the task id assigned by ECS.
async def get_cancel_requested(self) -> bool
queries Covalent if task cancellation has been requested. This can be called at various junctures in the run()
method if desired.
The run()
method should raise a TaskCancelledError
exception upon ascertaining that the backend job has been cancelled.
Each plugin defines the following abstract method to be overridden with backend-specific logic:
async def cancel(self, task_metadata: Dict, job_handle: str) -> bool`
Upon receiving a cancellation request, the Covalent server will invoke this with the following inputs:
task_metadata
: currently contains the keys dispatch_id
and node_id
as for the run()
method.job_handle
: the task's job identifier previously saved using set_job_handle()
.In addition to querying Covalent for cancellation requests, the run()
method may need to query the compute backend to determine whether the job is in fact cancelled and raise TaskCancelledError
if that is the case.
The code below provides a rough guide to using the above methods
from covalent._shared_files.exceptions import TaskCancelledError
...
async def proceed_if_task_not_cancelled(self):
if await self.get_cancel_requested():
self._debug_log(f"Task Cancelled")
raise TaskCancelledError(f"Batch job {batch_job_name} cancelled")
async def run(self, function: Callable, args: List, kwargs: Dict, task_metadata: Dict) -> Any:
...
await self.proceed_if_task_not_cancelled()
# upload pickled assets
...
await self.proceed_if_task_not_cancelled()
# invoke job/task
await self.set_job_handle(handle=job_handle)
...
# await self.poll_job_status(task_metadata, job_handle)
async def poll_job_status(self, task_metadata, job_id):
# Boto3 client invocations to check the job status
# while timeout_not_exceeded:
# job_state = client.describe_job(job_id)
# if job_state == "CANCELLED":
# raise TaskCancelledError
# ...
# await asyncio.sleep(poll_freq)
async def cancel(self, task_metadata: Dict, job_handle: str) -> None:
"""
Cancel the batch job
Arg(s)
task_metadata: Dictionary with the task's dispatch_id and node id
job_handle: Unique job handle assigned to the task by Batch
Return(s)
True/False indicating if the job was cancelled
"""
# boto client invocations to cancel the task
...
if job_cancelled:
return True
else:
# Job cannot be cancelled for one reason or another
return False
run()
method:
job handle
for the task once that is determined.cancel
methodcancel
functionality correctly integratedThere should be a suite of unit tests and a functional test for the executor. See the tests in the custom executor template for guidance (https://github.com/AgnostiqHQ/covalent-executor-template)
Partially fixes https://github.com/AgnostiqHQ/covalent-staging/issues/413
credentials_file
, region
, s3_bucket_name
, profile
, execution_role
, and log_group_name
to AWSExecutor
super classRemoteExecutor
, namely _upload_task
, submit_task
, get_status
, _poll_task
, _query_result
, and cancel
when appropriateAs a part of a recent change for self-hosted support, the base AWSExecutor class is solely responsible for resolving default values for profile, credentials file, and region instead of having individual aws executors determine defaults. As a result the region
argument can be left unspecified as the AWSExecutor will resolve this via boto3's default behavior (to check AWS_DEFAULT_REGION
environment variable).
However there is a need to explicitly obtain the region for log configuration, even if the region
attribute may be empty in the Executor & AWSExecutor. We then need a robust way to obtain the region without duplicating the logic boto3 may use to resolve the region.
Proposed solution: we must use the region directly supplied by a boto3 session
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.