prefecthq / prefect Goto Github PK
View Code? Open in Web Editor NEWPrefect is a workflow orchestration tool empowering developers to build, observe, and react to data pipelines
Home Page: https://prefect.io
License: Apache License 2.0
Prefect is a workflow orchestration tool empowering developers to build, observe, and react to data pipelines
Home Page: https://prefect.io
License: Apache License 2.0
The client.py code is stale (referencing states that don't exist, etc.). We should address this.
str() strips off the error type, which is useful information to include
Task runners and flow runners trap all errors, which can make debugging difficult.
We could create a context manager that would raise errors rather than trapping them. Given a Flow flow
that contains a task that raises some error:
flow.run() # returns State(FAILED)
with raise_on_fail():
flow.run() # actually raises an error at the appropriate place
Tests need to be updated for 0.2 refactor
.py
files(inspired by work in #18)
Currently, when a flow adds itself to a context, it does so under the flow
key (in other words, it becomes available at prefect.context.flow
). This has the potential to override a possibly common user-supplied context value. Instead, any automatically-set context value (including flows, as described here, and all settings used by FlowRunner
and TaskRunners
, like parameters
) should be put under a key, perhaps __prefect__
. This way they won't shadow user variables.
Currently, to retrieve the return value from a TaskResult
, you must:
from prefect import task
@task
def fun(x, y):
return x + y
tt = fun(1, 2)
tt.flow.run()[tt.task] # returns 3
which feels like it violates the Law of Demeter and should be simpler.
In it's place, we should have something like
tt.run() # returns 3
which should just execute the minimal path necessary to retrieve the result.
cc: @jlowin
Make sure there are no regressions (we currently test 3.4)
Example:
When opening a flow with five tasks which has one task fail (e.g. the task has already been assigned in the flow) then the flow should be rolled back instead of opened up until the point of failure.
node_1 = a()
node_2 = b(node_1)
node_3 = c(node_2)
node_4 = node_1(node_3)
node_5 = d(node_4)
The above code currently produces the flow a -> b -> c
and instead should not produce a valid flow.
Missed these attributes when changing context to use a private attribute -- need to fix + add tests.
flow.edges_to(task)
and flow.edges_from(task)
(and methods that depend on them, like upstream_tasks()
) raise a KeyError
if the task in question is not in the Flow. The error says that the task can't be found in an edge dictionary -- perhaps we should have a more informative error saying that the task is not in the current Flow.
Current version 0.2 -> 0.2.0
Prevents us from having to maintain dict-like features that we need.
Based on recent issues related to our config.toml
file, we should:
src/
directory)MANIFEST.in
filecore
side that were failing on the server
side, because config.toml
was present in the git-cloned repo but not present in the freshly installed package. This might be a good argument for switching to tox
Relevant literature:
NOTE: any decisions here should be reflected on the https://github.com/PrefectHQ/server side, too (cc: @joshmeek)
Currently, flow_runner.run()
accepts a return_tasks
argument. If provided, those tasks' States
are returned as the data of the flow run's State
. If the value is None
(the default), then the terminal tasks' States
are returned.
For consistency and to reduce noise, let's change the default behavior to return exactly what is provided as return_tasks
. If it's none, then the flow State
's data should just be an empty dict.
Via @cicdw, the following code fails non-deterministically:
from prefect import *
@task()
def add(x, y):
return x + y
with Flow('test') as f:
y = Parameter('y')
res = add(1, y)
f.run()
f.run().data
s = f.run()
s.all_states
s.all_states()
s.state
s.data
s = f.run(parameters=dict(y=56))
s
s.data
s.data[res.task]
s.data[res.task].data
Context updates for parameters appear to be failing in flow_runner.py
, such that the Parameter
task is unable to retrieve parameters from context and consequently fails.
This test utility has some magic in it that can prevent us from properly testing certain features (e.g., return tasks) and should be avoided in our own unit tests. However, this is a useful utility that will likely be a user-facing feature and thus we should actually keep it and unit test it.
Flows can be used as context managers:
with prefect.Flow('my flow') as f:
...
When this happens, the flow adds itself to prefect.context
under the flow
key. It stores a reference to the flow
key's previous value so that it can restore that value when the context manager exits.
However, if the previous value was None (or if there was no flow
key at all), then the flow restores flow=None
to the previous context. Later, this creates weird behavior when other Prefect objects check to see if a flow is in context.
This issue was previously masked by the bug described in #16
Flow().run()
should always return a State
object with a complete data
dictionary attribute, even when the Flow
fails early on. In this case, every Task
should be represented with a FAILED
State.
Example of empty dictionary:
from prefect import Parameter, Flow, task
@task
def add(x, y):
return x + y
with Flow("test") as f:
y = Parameter('y')
res = add(1, y)
f.run().data # returns None, should return {<Task: add>: State(FAILED)}
Example of incomplete dictionary:
from prefect import Flow, task
@task
def err(x, y):
raise ValueError("no sir")
@task
def add(x, y):
return x + y
with Flow("test") as f:
e = err(1, 2)
a = add(e, 5)
f.run().data # returns {<Task: add>: State(FAILED)}, should return {<Task: err>: State(FAILED), <Task: add>: State(FAILED)}
Once #44 / #59 are merged, trigger functions should raise TriggerFailed
instead of Failed
.
Moreover, triggers should probably never just return False -- originally that was done to allow triggers to indicate that they didn't have enough information to determine if they would run or not (for example, an all_success
trigger that was passed 2 Success
states and 1 Pending
) but now that all states must be finished before being passed to a trigger, triggers should always be able to either pass or raise a signal.
TaskResults are pointers to (Task, Flow) tuples; in others words they refer to the result of a specific task in a specific flow.
If we make the current Flow
explicit, then we can remove this class entirely because there will never be ambiguity about the Flow in question. For example:
with Flow() as flow:
x = Task('x')
y = Task('y')
z = AddTask()(x=x, y=y)
assert x in flow.tasks
assert y in flow.tasks
assert z in flow.tasks
We can also create a global default Flow which would allow users to continue to use the current syntax. This might be helpful for quick tests. In order to submit to Prefect server, however, users would have to create a new flow object (with a proper name).
x = Task('x')
y = Task('y')
z = AddTask()(x=x, y=y)
assert x in get_global_flow().tasks
assert y in get_global_flow().tasks
assert z in get_global_flow().tasks
new_flow = get_global_flow().copy_as(name='my flow')
Currently version is tracked in both setup.py
and the __init__.py
. I would recommend moving to a single location for both to avoid possible headaches in the future.
Possible solutions from Python docs: https://packaging.python.org/guides/single-sourcing-package-version/
It's difficult to figure out what tasks failed when a flow fails, so we add a flag to automatically return any failed tasks when the flow_runner
runs.
It could default to True
Rather than pass context variables explicitly as kwargs
, supply them as variables that are set via context manager
from prefect.context import as_of_date
class MyTask:
def run(self):
""" return True if this task's as-of-date is today """
if as_of_date.date() == datetime.now().date():
return True
else:
return False
# context manager sets the values of variables like as_of_date
with prefect.context.set_context(context):
my_task.run()
dask.bag.read_text
can load files from local storage, HDFS, S3, and GCS. Since it is already a required dependency (because of distributed
) it's a good choice for remote Flow storage and configuration
Whenever required parameters are not provided, an error is raised that then disappears into the void.
We should capture the error and pass it to the FAILED
state for the whole flow. Depending on how #15 is implemented, instead of putting the error message into the data
attribute, we might need to propagate it as the error message for every task's failed state.
This lambda is prematurely optimizing for the case where large data results are returned and we don't want to resolve them unnecessarily.
https://gitlab.com/prefect/prefect/blob/master/prefect/engine/flow_runner.py#L120
User configs are no longer eagerly generated (see #65), so we should expose this functionality as a CLI command.
Per discussion here.
Currently tasks have a dummy "checkpoint" attribute which needs to be handled
Currently, the State
of a flow run is determined exclusively by that Flow's terminal tasks using this waterfall:
SKIPs
- a valid "finished" state - should be handled differentlyHowever, we can imagine situations where this logic breaks down:
I propose allowing users to specify the tasks that should determine flow state. By default, it's the terminal tasks, but in the above example a user might have said that the X task should be used to determine flow state. No matter what tasks are chosen, the same waterfall described at the top of this issue would apply to those tasks. This would be a property of the Flow
object, not a runtime configuration, and would be provided when the flow itself was created:
my_flow = prefect.Flow(name='demo flow', run_state_tasks = [X_task])
I think run_state_tasks
is a bad name for this option -- open to suggestions.
Currently, *args
is explicitly not supported because edges that pass data are identified by string keys that become the **kwargs
for the task's run()
method. One way to support *args
would be to automatically create edges with integer keys. When executing a flow, we would put edges with string keys into **kwargs
and edges with integer keys into *args
.
When a task fails, and the following task also fails because it has an all_successful
trigger, it would be helpful to know that the second task didn't fail because of an error or execution problem, but that it failed because the upstream task failed.
We've discussed creating an UPSTREAM_FAILED state to represent this behavior, but it could create odd situations: supposing a task has an all_failed
trigger and the upstream task succeeds. Should the resulting task really say UPSTREAM_FAILED
?
I think we need a TRIGGER_FAILED
which might act functionally like a SKIP
but not be treated as a SUCCESS
.
The start_tasks
kwarg appears to not be working:
from prefect import *
GLOBAL_DICT = {'key': 0}
class PrintTask(Task):
def run(self):
GLOBAL_DICT['key'] += 1
return 1
class ReturnTask(Task):
was_called = False
def run(self, x):
if self.was_called is False:
self.was_called = True
raise ValueError("error")
return x
with Flow() as f:
t1 = PrintTask()
t2 = ReturnTask(max_retries=1)
result = t2(t1())
state = f.run(return_tasks=[t2])
state = f.run(return_tasks=[t2], start_tasks=[t2])
## GLOBAL_DICT['key'] == 2
Integrate https://github.com/agronholm/apscheduler as a Dask extension
For local/offline/simple testing, a PickleEnvironment
would make flow serialization much simpler.
Functionally, it would simply serialize a flow using cloudpickle
, and deserialize it on demand.
Need to have a process from generating markdown files from the docstrings in order to make documentation for the prefect library. The markdown files will then be loaded into the static site with vuepress
Some options:
https://github.com/NiklasRosenstein/pydoc-markdown
https://github.com/coldfix/doc2md
Instead of the current State class that has a string state
and Any data
attribute, as well as a bunch of is_finished()
methods that check the state string against a list of valid strings, consider building a hierarchy of State classes:
# Base class
class State:
# class attribute for example; set in __init__
data = None
# -------------------------------------------
# Finished classes - Success, Failed, Skipped
class Finished(State):
pass
class Success(Finished):
pass
class Failed(Finished):
# class attribute for example; set in __init__
message = None
class Skipped(Finished):
pass
# -------------------------------------------
# Pending classes - Retry and Scheduled (and Pending itself)
class Pending(State):
pass
class Retry(Pending):
# class attribute for example; set in __init__
retry_time = None
class Scheduled(Pending):
# class attribute for example; set in __init__
scheduled_time = None
Then checking and working with states is easier than the current system of checking the string attribute and hoping the data attribute matches an expected schema:
s = Success(100)
f = Failed('Division by zero error')
assert isinstance(s, Finished)
assert isinstance(f, Finished)
r = Retry(datetime(2018, 12, 31))
assert isinstance(r, Pending)
assert isinstance(r.retry_time datetime)
Per discussion w/ @jlowin, we should consider implementing a new signal for manual_only
triggers which prevents a task from running while also creating a Pending()
state with cached_inputs
. The current implementation within task_runner
feels brittle.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.