conductor-sdk / conductor-python Goto Github PK
View Code? Open in Web Editor NEWConductor OSS SDK for Python programming language
License: Apache License 2.0
Conductor OSS SDK for Python programming language
License: Apache License 2.0
Failed to poll task for: xxx,
conductor/client/http/models/task_def.py", line 265, in name
raise ValueError("Invalid value for name
, must not be None
") # noqa: E501
ValueError: Invalid value for name
, must not be None
Resources:
Start by removing six
package dependency and its usage.
When using logging.level = logging.DEBUG
, all requests are logged as raw, which is great, but gets annoying quite easily. Would be nice to have another level between them, to debug only the package code.
Use Orkes Playground Swagger docs as reference: https://play-app.orkes.io/api-docs
Steps:
Accept-Encoding: gzip
Currently, TaskHandler
spawn a bunch of processes, capable of running tasks in parallel.
Each of these TaskRunner
s instantiate an ApiClient
at each request, due to pickle
issues.
Research required to understand alternatives.
Requires usage suggestion
Examples should be added here: https://github.com/conductor-sdk/conductor-examples
Aim to test all possible ways of application startup, with and without some parameters.
Tool used to test by hand:
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.configuration.settings.metrics_settings import MetricsSettings
from conductor.client.http.api_client import ApiClient
from conductor.client.http.api.metadata_resource_api import MetadataResourceApi
from conductor.client.http.api.task_resource_api import TaskResourceApi
from conductor.client.http.api.workflow_resource_api import WorkflowResourceApi
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface
from typing import List
import logging
logger = logging.getLogger(
Configuration.get_logging_formatted_name(
__name__
)
)
class SimplePythonWorker(WorkerInterface):
def execute(self, task: Task) -> TaskResult:
task_result = self.get_task_result_from_task(task)
task_result.add_output_data('key1', 'value')
task_result.add_output_data('key2', 42)
task_result.add_output_data('key3', False)
task_result.status = TaskResultStatus.COMPLETED
return task_result
def get_python_task_definition_example() -> List[dict]:
return [
{
"createTime": 1650595379661,
"createdBy": "",
"name": "python_task_example_from_code",
"description": "Python task example from code",
"retryCount": 3,
"timeoutSeconds": 300,
"inputKeys": [],
"outputKeys": [],
"timeoutPolicy": "TIME_OUT_WF",
"retryLogic": "FIXED",
"retryDelaySeconds": 10,
"responseTimeoutSeconds": 180,
"inputTemplate": {},
"rateLimitPerFrequency": 0,
"rateLimitFrequencyInSeconds": 1,
"ownerEmail": "[email protected]",
"backoffScaleFactor": 1
},
]
def get_python_workflow_definition_example() -> dict:
return {
"updateTime": 1650595431465,
"name": "workflow_with_python_task_example_from_code",
"description": "Workflow with python task example from code",
"version": 1,
"tasks": [
{
"name": "python_task_example_from_code",
"taskReferenceName": "python_task_example_from_code_ref_0",
"inputParameters": {
},
"type": "SIMPLE",
"decisionCases": {
},
"defaultCase": [
],
"forkTasks":[
],
"startDelay":0,
"joinOn":[
],
"optional":False,
"defaultExclusiveJoinTask":[
],
"asyncComplete":False,
"loopOver":[
]
}
],
"inputParameters": [
],
"outputParameters": {
"workerOutput": "${python_task_example_from_code_ref_0.output}"
},
"schemaVersion": 2,
"restartable": True,
"workflowStatusListenerEnabled": False,
"ownerEmail": "[email protected]",
"timeoutPolicy": "ALERT_ONLY",
"timeoutSeconds": 0,
"variables": {
},
"inputTemplate": {
}
}
def define_task_and_workflow(api_client: ApiClient) -> None:
metadata_client = MetadataResourceApi(api_client)
try:
metadata_client.register_task_def1(
body=get_python_task_definition_example()
)
metadata_client.create(
body=get_python_workflow_definition_example()
)
except Exception as e:
logger.debug(f'Failed to define task/workflow, reason: {e}')
def start_workflow(api_client: ApiClient, workflow_name: str) -> str:
workflow_client = WorkflowResourceApi(api_client)
workflowId = workflow_client.start_workflow(
body={},
name=workflow_name
)
return workflowId
def start_workflows(api_client: ApiClient, workflow_name: str, qty: int) -> List[str]:
workflowIdList = []
for _ in range(qty):
try:
workflowId = start_workflow(api_client, workflow_name)
workflowIdList.append(workflowId)
logger.debug(
f'Started workflow: {workflow_name}, with id: {workflowId}'
)
except Exception as e:
logger.debug(
f'Failed to start workflow: {workflow_name}, reason: {e}'
)
return workflowIdList
def main():
configuration = Configuration(
base_url='https://play.orkes.io',
debug=True,
authentication_settings=AuthenticationSettings(
key_id='',
key_secret=''
)
)
configuration.apply_logging_config()
api_client = ApiClient(configuration)
workflow_id = start_workflow(
api_client,
'workflow_with_python_task_example_from_code'
)
logger.debug(f'workflow_id: {workflow_id}')
task_api = TaskResourceApi(api_client)
response = task_api.update_task_by_ref_name(
output={'hello': 'world'},
workflow_id=workflow_id,
task_ref_name='python_task_example_from_code_ref_0',
status=TaskResultStatus.COMPLETED.value,
)
logger.debug(f'task update response: {response}')
workers = [
SimplePythonWorker('python_task_example_from_code'),
]
workflow_ids = start_workflows(
api_client,
'workflow_with_python_task_example_from_code',
10
)
metrics_settings = MetricsSettings()
with TaskHandler(workers, configuration, metrics_settings) as task_handler:
task_handler.start_processes()
task_handler.join_processes()
if __name__ == '__main__':
main()
Each worker will log as ${hostname}-${pid}
Create a technical article for Python SDK. Topics to cover:
Related issue: https://github.com/orkes-io/orkes-conductor/issues/111
Evaluate behavior on invalid credentials:
Similar to code implemented for Conductor Java SDK
I'm trying to pip install conductor-python
and then import conductor
, but I'm getting an No module named 'conductor'
. Reverting to version 1.0.29
fixes it.
Similar to this Package inside the java client: https://github.com/gardusig/conductor/tree/main/client/src/main/java/com/netflix/conductor/client/telemetry
WorkflowExecutor
could have more methods providing support for single requests with multiple elements (e.g. workflow-bulk-resource APIs)
All APIs support a parameter async
, may be relevant when considering API client with a larger shared thread pool
New package: https://pypi.org/project/requests/
HTTP client was implemented with swagger code generator
tool, which was a quick handy solution, but may be upgraded for more detailed usage.
Updates should probably lie within api_client.py
and rest.py
at http folder: src/conductor/client/http
as the title suggests,
the simple_woker.py example in the README.md does not work
Orkes Playground:
Use name: conductor-python
Official documentation: https://packaging.python.org/en/latest/tutorials/packaging-projects/
Aim to test all possible ways of application startup, with and without some parameters.
Tool used to test by hand:
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.configuration.settings.metrics_settings import MetricsSettings
from conductor.client.http.api_client import ApiClient
from conductor.client.http.api.metadata_resource_api import MetadataResourceApi
from conductor.client.http.api.task_resource_api import TaskResourceApi
from conductor.client.http.api.workflow_resource_api import WorkflowResourceApi
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface
from typing import List
import logging
logger = logging.getLogger(
Configuration.get_logging_formatted_name(
__name__
)
)
class SimplePythonWorker(WorkerInterface):
def execute(self, task: Task) -> TaskResult:
task_result = self.get_task_result_from_task(task)
task_result.add_output_data('key1', 'value')
task_result.add_output_data('key2', 42)
task_result.add_output_data('key3', False)
task_result.status = TaskResultStatus.COMPLETED
return task_result
def get_python_task_definition_example() -> List[dict]:
return [
{
"createTime": 1650595379661,
"createdBy": "",
"name": "python_task_example_from_code",
"description": "Python task example from code",
"retryCount": 3,
"timeoutSeconds": 300,
"inputKeys": [],
"outputKeys": [],
"timeoutPolicy": "TIME_OUT_WF",
"retryLogic": "FIXED",
"retryDelaySeconds": 10,
"responseTimeoutSeconds": 180,
"inputTemplate": {},
"rateLimitPerFrequency": 0,
"rateLimitFrequencyInSeconds": 1,
"ownerEmail": "[email protected]",
"backoffScaleFactor": 1
},
]
def get_python_workflow_definition_example() -> dict:
return {
"updateTime": 1650595431465,
"name": "workflow_with_python_task_example_from_code",
"description": "Workflow with python task example from code",
"version": 1,
"tasks": [
{
"name": "python_task_example_from_code",
"taskReferenceName": "python_task_example_from_code_ref_0",
"inputParameters": {
},
"type": "SIMPLE",
"decisionCases": {
},
"defaultCase": [
],
"forkTasks":[
],
"startDelay":0,
"joinOn":[
],
"optional":False,
"defaultExclusiveJoinTask":[
],
"asyncComplete":False,
"loopOver":[
]
}
],
"inputParameters": [
],
"outputParameters": {
"workerOutput": "${python_task_example_from_code_ref_0.output}"
},
"schemaVersion": 2,
"restartable": True,
"workflowStatusListenerEnabled": False,
"ownerEmail": "[email protected]",
"timeoutPolicy": "ALERT_ONLY",
"timeoutSeconds": 0,
"variables": {
},
"inputTemplate": {
}
}
def define_task_and_workflow(api_client: ApiClient) -> None:
metadata_client = MetadataResourceApi(api_client)
try:
metadata_client.register_task_def1(
body=get_python_task_definition_example()
)
metadata_client.create(
body=get_python_workflow_definition_example()
)
except Exception as e:
logger.debug(f'Failed to define task/workflow, reason: {e}')
def start_workflow(api_client: ApiClient, workflow_name: str) -> str:
workflow_client = WorkflowResourceApi(api_client)
workflowId = workflow_client.start_workflow(
body={},
name=workflow_name
)
return workflowId
def start_workflows(api_client: ApiClient, workflow_name: str, qty: int) -> List[str]:
workflowIdList = []
for _ in range(qty):
try:
workflowId = start_workflow(api_client, workflow_name)
workflowIdList.append(workflowId)
logger.debug(
f'Started workflow: {workflow_name}, with id: {workflowId}'
)
except Exception as e:
logger.debug(
f'Failed to start workflow: {workflow_name}, reason: {e}'
)
return workflowIdList
def main():
configuration = Configuration(
base_url='https://play.orkes.io',
debug=True,
authentication_settings=AuthenticationSettings(
key_id='',
key_secret=''
)
)
configuration.apply_logging_config()
api_client = ApiClient(configuration)
workflow_id = start_workflow(
api_client,
'workflow_with_python_task_example_from_code'
)
logger.debug(f'workflow_id: {workflow_id}')
task_api = TaskResourceApi(api_client)
response = task_api.update_task_by_ref_name(
output={'hello': 'world'},
workflow_id=workflow_id,
task_ref_name='python_task_example_from_code_ref_0',
status=TaskResultStatus.COMPLETED.value,
)
logger.debug(f'task update response: {response}')
workers = [
SimplePythonWorker('python_task_example_from_code'),
]
workflow_ids = start_workflows(
api_client,
'workflow_with_python_task_example_from_code',
10
)
metrics_settings = MetricsSettings()
with TaskHandler(workers, configuration, metrics_settings) as task_handler:
task_handler.start_processes()
task_handler.join_processes()
if __name__ == '__main__':
main()
Requirements:
Example:
register_task_def1
could be register_task_def
create
could be register_workflow_def
batchSize
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.