Git Product home page Git Product logo

conductor-python's People

Contributors

amjith avatar anjkl avatar c4lm avatar coderabhigupta avatar dengguojie avatar dougsillars avatar gardusig avatar hebrd avatar jchacking avatar marcocrasso avatar nhandt2021 avatar opoupeney avatar phmurias-invitae avatar rizafarheen avatar srividhya-s-subramanian avatar tullytim avatar v1r3n avatar v891 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

conductor-python's Issues

Invalid value for `name`

Failed to poll task for: xxx,
conductor/client/http/models/task_def.py", line 265, in name
raise ValueError("Invalid value for name, must not be None") # noqa: E501
ValueError: Invalid value for name, must not be None

Setup GitHub Action to publish pypi releases

  1. setup a github action that listens to release events (e.g. https://github.com/Netflix/conductor/blob/main/.github/workflows/publish.yml)
  2. Update the setup.cfg to use an env variable for version
    extract the tag that is checked out - the tag is created as part of GitHub release process
  3. set the value of the tag (.e.g. v1.0.1) to extract version (e.g. 1.0.1) and set as env variable
  4. Use env variables to pass user/pass for pypi
  5. Configure the GitHub project with pypi user/pass as secrets that you can use in 5 above

Evaluate client performance

Create a benchmark comparing the performance while changing the number of workers. Increasing this number should improve the performance, until reaching an inversion point, which will at least plateau the graph. Like this example:

image

Create integration tests for general use case

Aim to test all possible ways of application startup, with and without some parameters.

Tool used to test by hand:

from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.configuration.settings.metrics_settings import MetricsSettings
from conductor.client.http.api_client import ApiClient
from conductor.client.http.api.metadata_resource_api import MetadataResourceApi
from conductor.client.http.api.task_resource_api import TaskResourceApi
from conductor.client.http.api.workflow_resource_api import WorkflowResourceApi
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface
from typing import List
import logging

logger = logging.getLogger(
    Configuration.get_logging_formatted_name(
        __name__
    )
)


class SimplePythonWorker(WorkerInterface):
    def execute(self, task: Task) -> TaskResult:
        task_result = self.get_task_result_from_task(task)
        task_result.add_output_data('key1', 'value')
        task_result.add_output_data('key2', 42)
        task_result.add_output_data('key3', False)
        task_result.status = TaskResultStatus.COMPLETED
        return task_result


def get_python_task_definition_example() -> List[dict]:
    return [
        {
            "createTime": 1650595379661,
            "createdBy": "",
            "name": "python_task_example_from_code",
            "description": "Python task example from code",
            "retryCount": 3,
            "timeoutSeconds": 300,
            "inputKeys": [],
            "outputKeys": [],
            "timeoutPolicy": "TIME_OUT_WF",
            "retryLogic": "FIXED",
            "retryDelaySeconds": 10,
            "responseTimeoutSeconds": 180,
            "inputTemplate": {},
            "rateLimitPerFrequency": 0,
            "rateLimitFrequencyInSeconds": 1,
            "ownerEmail": "[email protected]",
            "backoffScaleFactor": 1
        },
    ]


def get_python_workflow_definition_example() -> dict:
    return {
        "updateTime": 1650595431465,
        "name": "workflow_with_python_task_example_from_code",
        "description": "Workflow with python task example from code",
        "version": 1,
        "tasks": [
            {
                "name": "python_task_example_from_code",
                "taskReferenceName": "python_task_example_from_code_ref_0",
                "inputParameters": {

                },
                "type": "SIMPLE",
                "decisionCases": {

                },
                "defaultCase": [

                ],
                "forkTasks":[

                ],
                "startDelay":0,
                "joinOn":[

                ],
                "optional":False,
                "defaultExclusiveJoinTask":[

                ],
                "asyncComplete":False,
                "loopOver":[

                ]
            }
        ],
        "inputParameters": [

        ],
        "outputParameters": {
            "workerOutput": "${python_task_example_from_code_ref_0.output}"
        },
        "schemaVersion": 2,
        "restartable": True,
        "workflowStatusListenerEnabled": False,
        "ownerEmail": "[email protected]",
        "timeoutPolicy": "ALERT_ONLY",
        "timeoutSeconds": 0,
        "variables": {

        },
        "inputTemplate": {

        }
    }


def define_task_and_workflow(api_client: ApiClient) -> None:
    metadata_client = MetadataResourceApi(api_client)
    try:
        metadata_client.register_task_def1(
            body=get_python_task_definition_example()
        )
        metadata_client.create(
            body=get_python_workflow_definition_example()
        )
    except Exception as e:
        logger.debug(f'Failed to define task/workflow, reason: {e}')


def start_workflow(api_client: ApiClient, workflow_name: str) -> str:
    workflow_client = WorkflowResourceApi(api_client)
    workflowId = workflow_client.start_workflow(
        body={},
        name=workflow_name
    )
    return workflowId


def start_workflows(api_client: ApiClient, workflow_name: str, qty: int) -> List[str]:
    workflowIdList = []
    for _ in range(qty):
        try:
            workflowId = start_workflow(api_client, workflow_name)
            workflowIdList.append(workflowId)
            logger.debug(
                f'Started workflow: {workflow_name}, with id: {workflowId}'
            )
        except Exception as e:
            logger.debug(
                f'Failed to start workflow: {workflow_name}, reason: {e}'
            )
    return workflowIdList


def main():
    configuration = Configuration(
        base_url='https://play.orkes.io',
        debug=True,
        authentication_settings=AuthenticationSettings(
            key_id='',
            key_secret=''
        )
    )
    configuration.apply_logging_config()

    api_client = ApiClient(configuration)

    workflow_id = start_workflow(
        api_client,
        'workflow_with_python_task_example_from_code'
    )
    logger.debug(f'workflow_id: {workflow_id}')

    task_api = TaskResourceApi(api_client)
    response = task_api.update_task_by_ref_name(
        output={'hello': 'world'},
        workflow_id=workflow_id,
        task_ref_name='python_task_example_from_code_ref_0',
        status=TaskResultStatus.COMPLETED.value,
    )
    logger.debug(f'task update response: {response}')

    workers = [
        SimplePythonWorker('python_task_example_from_code'),
    ]
    workflow_ids = start_workflows(
        api_client,
        'workflow_with_python_task_example_from_code',
        10
    )
    metrics_settings = MetricsSettings()
    with TaskHandler(workers, configuration, metrics_settings) as task_handler:
        task_handler.start_processes()
        task_handler.join_processes()


if __name__ == '__main__':
    main()

Refactor unit tests, to be compliant with recent client changes

Aim to test all possible ways of application startup, with and without some parameters.

Tool used to test by hand:

from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.configuration.settings.metrics_settings import MetricsSettings
from conductor.client.http.api_client import ApiClient
from conductor.client.http.api.metadata_resource_api import MetadataResourceApi
from conductor.client.http.api.task_resource_api import TaskResourceApi
from conductor.client.http.api.workflow_resource_api import WorkflowResourceApi
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface
from typing import List
import logging

logger = logging.getLogger(
    Configuration.get_logging_formatted_name(
        __name__
    )
)


class SimplePythonWorker(WorkerInterface):
    def execute(self, task: Task) -> TaskResult:
        task_result = self.get_task_result_from_task(task)
        task_result.add_output_data('key1', 'value')
        task_result.add_output_data('key2', 42)
        task_result.add_output_data('key3', False)
        task_result.status = TaskResultStatus.COMPLETED
        return task_result


def get_python_task_definition_example() -> List[dict]:
    return [
        {
            "createTime": 1650595379661,
            "createdBy": "",
            "name": "python_task_example_from_code",
            "description": "Python task example from code",
            "retryCount": 3,
            "timeoutSeconds": 300,
            "inputKeys": [],
            "outputKeys": [],
            "timeoutPolicy": "TIME_OUT_WF",
            "retryLogic": "FIXED",
            "retryDelaySeconds": 10,
            "responseTimeoutSeconds": 180,
            "inputTemplate": {},
            "rateLimitPerFrequency": 0,
            "rateLimitFrequencyInSeconds": 1,
            "ownerEmail": "[email protected]",
            "backoffScaleFactor": 1
        },
    ]


def get_python_workflow_definition_example() -> dict:
    return {
        "updateTime": 1650595431465,
        "name": "workflow_with_python_task_example_from_code",
        "description": "Workflow with python task example from code",
        "version": 1,
        "tasks": [
            {
                "name": "python_task_example_from_code",
                "taskReferenceName": "python_task_example_from_code_ref_0",
                "inputParameters": {

                },
                "type": "SIMPLE",
                "decisionCases": {

                },
                "defaultCase": [

                ],
                "forkTasks":[

                ],
                "startDelay":0,
                "joinOn":[

                ],
                "optional":False,
                "defaultExclusiveJoinTask":[

                ],
                "asyncComplete":False,
                "loopOver":[

                ]
            }
        ],
        "inputParameters": [

        ],
        "outputParameters": {
            "workerOutput": "${python_task_example_from_code_ref_0.output}"
        },
        "schemaVersion": 2,
        "restartable": True,
        "workflowStatusListenerEnabled": False,
        "ownerEmail": "[email protected]",
        "timeoutPolicy": "ALERT_ONLY",
        "timeoutSeconds": 0,
        "variables": {

        },
        "inputTemplate": {

        }
    }


def define_task_and_workflow(api_client: ApiClient) -> None:
    metadata_client = MetadataResourceApi(api_client)
    try:
        metadata_client.register_task_def1(
            body=get_python_task_definition_example()
        )
        metadata_client.create(
            body=get_python_workflow_definition_example()
        )
    except Exception as e:
        logger.debug(f'Failed to define task/workflow, reason: {e}')


def start_workflow(api_client: ApiClient, workflow_name: str) -> str:
    workflow_client = WorkflowResourceApi(api_client)
    workflowId = workflow_client.start_workflow(
        body={},
        name=workflow_name
    )
    return workflowId


def start_workflows(api_client: ApiClient, workflow_name: str, qty: int) -> List[str]:
    workflowIdList = []
    for _ in range(qty):
        try:
            workflowId = start_workflow(api_client, workflow_name)
            workflowIdList.append(workflowId)
            logger.debug(
                f'Started workflow: {workflow_name}, with id: {workflowId}'
            )
        except Exception as e:
            logger.debug(
                f'Failed to start workflow: {workflow_name}, reason: {e}'
            )
    return workflowIdList


def main():
    configuration = Configuration(
        base_url='https://play.orkes.io',
        debug=True,
        authentication_settings=AuthenticationSettings(
            key_id='',
            key_secret=''
        )
    )
    configuration.apply_logging_config()

    api_client = ApiClient(configuration)

    workflow_id = start_workflow(
        api_client,
        'workflow_with_python_task_example_from_code'
    )
    logger.debug(f'workflow_id: {workflow_id}')

    task_api = TaskResourceApi(api_client)
    response = task_api.update_task_by_ref_name(
        output={'hello': 'world'},
        workflow_id=workflow_id,
        task_ref_name='python_task_example_from_code_ref_0',
        status=TaskResultStatus.COMPLETED.value,
    )
    logger.debug(f'task update response: {response}')

    workers = [
        SimplePythonWorker('python_task_example_from_code'),
    ]
    workflow_ids = start_workflows(
        api_client,
        'workflow_with_python_task_example_from_code',
        10
    )
    metrics_settings = MetricsSettings()
    with TaskHandler(workers, configuration, metrics_settings) as task_handler:
        task_handler.start_processes()
        task_handler.join_processes()


if __name__ == '__main__':
    main()

Improve TaskHandler polling strategy

  • Add new parameter for batchSize
  • Each worker will have a standalone subprocess to poll indefinitely
    • For each polled task, start a new subprocess to execute and update it

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.