australiansynchrotron / lightflow Goto Github PK
View Code? Open in Web Editor NEWA lightweight, distributed workflow system
Home Page: https://australiansynchrotron.github.io/lightflow/
License: BSD 3-Clause "New" or "Revised" License
A lightweight, distributed workflow system
Home Page: https://australiansynchrotron.github.io/lightflow/
License: BSD 3-Clause "New" or "Revised" License
Currently Lightflow doesn't support the latest version of NetworkX.
I'm running a fairly complicated pipeline but it works surprisingly well.
( this dag calls this dag which calls this dag )
However, sometimes, I encounter this error below on the DAG. All the tasks, however, complete just fine.
Any idea what this may come from? Have you seen this before? Any tips on debugging? I will also keep searching myself but I thought I'd flag this.
Traceback (most recent call last):
File "/home/xf11bm/miniconda3/envs/lightflow-pipeline/lib/python3.6/site-packages/kombu/serialization.py", line 50, in _reraise_errors
yield
File "/home/xf11bm/miniconda3/envs/lightflow-pipeline/lib/python3.6/site-packages/kombu/serialization.py", line 263, in loads
return decode(data)
File "/home/xf11bm/projects/lightflow/lightflow/queue/pickle.py", line 15, in cloudpickle_loads
return load(BytesIO(s))
AttributeError: type object 'int64' has no attribute '__index__'
current
lightflow workflow start <simple>
Is there a way to start workflow like this?
import lightflow
import simple
lightflow.start_workflow(simple)
I have followed the tutorial for execute the simple workflow but it doesn't work properly and doesn't enter callback method of the PythonTask and doesn't print anything
OS: Windows
Hey,
I am trying to use this project to define some of workflows for my daily needs as a developer.
I can see that the function can support multiple inputs using aliases , how about multiple outputs?
Is there any way to do that or any lead to make it possible?
The Docs are Blank for a majority of the documentation links.
The event monitor is using a thread inside a generator. Stopping the generator does not work because the thread gets stuck inside a thread lock.
The current timeout for MongoDB after starting a workflow is ~30sec. Catch a missing MongoDD installation earlier.
any plan to support elasticsearch as store engine
``Hi, I have installed as described here: https://lightflow.readthedocs.io/en/latest/installation.html (using the ubuntu package manager for installation of MongoDB and redis). When i start a workflow, i get the following errors:
[26/04/2023 15:54:14][ERROR] ForkPoolWorker-1 | Task lightflow.queue.jobs.execute_dag[12e80c66-4754-41a6-83f7-1ce4bed9b70b] raised unexpected: TypeError('database must be an instance of Database')
Traceback (most recent call last):
File "/exports/scratch/ana3/lib/python3.9/site-packages/celery/app/trace.py", line 385, in trace_task
R = retval = fun(*args, **kwargs)
File "/exports/scratch/ana3/lib/python3.9/site-packages/celery/app/trace.py", line 650, in __protected_call__
return self.run(*args, **kwargs)
File "/exports/scratch/ana3/lib/python3.9/site-packages/lightflow/queue/jobs.py", line 117, in execute_dag
store_doc = DataStore(**self.app.user_options['config'].data_store,
File "/exports/scratch/ana3/lib/python3.9/site-packages/lightflow/models/datastore.py", line 223, in get
fs = GridFSProxy(GridFS(db.unproxied_object))
File "/exports/scratch/ana3/lib/python3.9/site-packages/gridfs/__init__.py", line 90, in __init__
raise TypeError("database must be an instance of Database")
TypeError: database must be an instance of Database
[26/04/2023 15:54:14][ERROR] ForkPoolWorker-8 | Task lightflow.queue.jobs.execute_workflow[feb66360-ae4e-42a9-a7b2-51a04b45f06c] raised unexpected: TypeError('database must be an instance of Database')
Traceback (most recent call last):
File "/exports/scratch/ana3/lib/python3.9/site-packages/celery/app/trace.py", line 385, in trace_task
R = retval = fun(*args, **kwargs)
File "/exports/scratch/ana3/lib/python3.9/site-packages/celery/app/trace.py", line 650, in __protected_call__
return self.run(*args, **kwargs)
File "/exports/scratch/ana3/lib/python3.9/site-packages/lightflow/queue/jobs.py", line 69, in execute_workflow
workflow.run(config=self.app.user_options['config'],
File "/exports/scratch/ana3/lib/python3.9/site-packages/lightflow/models/workflow.py", line 228, in run
data_store.remove(self._workflow_id)
File "/exports/scratch/ana3/lib/python3.9/site-packages/lightflow/models/datastore.py", line 197, in remove
fs = GridFSProxy(GridFS(db.unproxied_object))
File "/exports/scratch/ana3/lib/python3.9/site-packages/gridfs/__init__.py", line 90, in __init__
raise TypeError("database must be an instance of Database")
TypeError: database must be an instance of Database
Is there an initialization needed for the MongoDB? When i go to the http://localhost:27017/ i get:
It looks like you are trying to access MongoDB over HTTP on the native driver port.
When I import the custom module in the workflow callback function, the execution goes wrong.
[12/01/2018 13:21:32][ERROR] ForkPoolWorker-7 | Task lightflow.queue.jobs.execute_task[6cd45833-e7dd-4753-abe2-3b5a1b72886f] raised unexpected: ImportError("No module named 'lib'",)
Traceback (most recent call last):
File "/usr/local/python3/lib/python3.5/site-packages/celery/app/trace.py", line 374, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/python3/lib/python3.5/site-packages/celery/app/trace.py", line 629, in protected_call
return self.run(*args, **kwargs)
File "/usr/local/python3/lib/python3.5/site-packages/lightflow/queue/jobs.py", line 214, in execute_task
event_type=JobEventName.Aborted))
File "/usr/local/python3/lib/python3.5/site-packages/lightflow/models/task.py", line 245, in _run
result = self.run(data, store, signal, context)
File "/usr/local/python3/lib/python3.5/site-packages/lightflow/tasks/python_task.py", line 64, in run
result = self._callback(data, store, signal, context, **kwargs)
File "./tasks/filter_task.py", line 14, in inc_number
from lib.filter import HtmlFilter
ImportError: No module named 'lib'
Hi,
I have a DAG that generates many other DAG's. I would like it to generate at most N
running DAG's.
Is there a way to check within a PythonTask
is a DAG is complete or not?
I think something like signal.is_complete(dag_name)
would be sufficient.
Note, that I don't want to call signal.join_dags(dag_names)
since I don't know which DAGs will complete first.
thanks!
E.g.: PythonTask(name, callback, timeout=600)
Following the installation instructions, when I attempt to start a worker lightflow worker start
I get the following error:
File "/home/anderson/.local/lib/python3.10/site-packages/lightflow/queue/jobs.py", line 16, in <module>
@celery.task(bind=True)
File "/home/anderson/.local/lib/python3.10/site-packages/celery/local.py", line 478, in __getattr__
return ModuleType.__getattribute__(self, name)
AttributeError: module 'celery' has no attribute 'task'. Did you mean: 'Task'?
I suspect it is because my system (Ubuntu 22.04) is using celery 5.2.7 rather than celery 4 as defined in lightflow's requirements. But I'm not a celery expert so cannot confirm.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.