Git Product home page Git Product logo

prefecthq / prefect Goto Github PK

View Code? Open in Web Editor NEW
14.6K 160.0 1.5K 143.77 MB

Prefect is a workflow orchestration tool empowering developers to build, observe, and react to data pipelines

Home Page: https://prefect.io

License: Apache License 2.0

Python 98.75% Dockerfile 0.04% Shell 0.08% JavaScript 0.01% HTML 0.01% Vue 0.80% TypeScript 0.25% CSS 0.01% Mako 0.01% Brainfuck 0.01% Jinja 0.06%
python workflow data-engineering data-science workflow-engine prefect infrastructure ml-ops data-ops automation

prefect's People

Contributors

abrookins avatar ahuang11 avatar anna-geller avatar anticorrelator avatar billpalombi avatar bunchesofdonald avatar chrisguidry avatar cicdw avatar dependabot[bot] avatar desertaxle avatar discdiver avatar github-actions[bot] avatar jakekaplan avatar jawnsy avatar jlowin avatar marichka-offen avatar peytonrunyan avatar pleek91 avatar rpeden avatar serinamarie avatar stackoverfloweth avatar thatgalnatalie avatar tpdorsey avatar urimandujano avatar willraphaelson avatar zangell44 avatar zanieb avatar zhen0 avatar znicholasbrown avatar zzstoatzz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

prefect's Issues

Add "raise_on_fail" context manager

Task runners and flow runners trap all errors, which can make debugging difficult.

We could create a context manager that would raise errors rather than trapping them. Given a Flow flow that contains a task that raises some error:

flow.run() # returns State(FAILED)

with raise_on_fail():
    flow.run() # actually raises an error at the appropriate place

Add system flows

  • Scheduler - schedules Flows
  • FlowLoader - Loads Flows from .py files
  • Zombie killer - Kills runs that have no heartbeat

Use special context keys for automatic context values

(inspired by work in #18)

Currently, when a flow adds itself to a context, it does so under the flow key (in other words, it becomes available at prefect.context.flow). This has the potential to override a possibly common user-supplied context value. Instead, any automatically-set context value (including flows, as described here, and all settings used by FlowRunner and TaskRunners, like parameters) should be put under a key, perhaps __prefect__. This way they won't shadow user variables.

Implement `TaskResult.run()` method (efficiently)

Currently, to retrieve the return value from a TaskResult, you must:

from prefect import task

@task
def fun(x, y):
    return x + y

tt = fun(1, 2)
tt.flow.run()[tt.task] # returns 3

which feels like it violates the Law of Demeter and should be simpler.

In it's place, we should have something like

tt.run() # returns 3

which should just execute the minimal path necessary to retrieve the result.

cc: @jlowin

Whenever a flow context is opened, the operations should be atomic

Example:
When opening a flow with five tasks which has one task fail (e.g. the task has already been assigned in the flow) then the flow should be rolled back instead of opened up until the point of failure.

node_1 = a()
node_2 = b(node_1)
node_3 = c(node_2)
node_4 = node_1(node_3)
node_5 = d(node_4)

The above code currently produces the flow a -> b -> c and instead should not produce a valid flow.

Flow methods should raise a more informative error if a task is not in the flow

flow.edges_to(task) and flow.edges_from(task) (and methods that depend on them, like upstream_tasks()) raise a KeyError if the task in question is not in the Flow. The error says that the task can't be found in an edge dictionary -- perhaps we should have a more informative error saying that the task is not in the current Flow.

Prefect packaging clean-up

Based on recent issues related to our config.toml file, we should:

  • review our package directory structure (possibly moving everything into a src/ directory)
  • review our MANIFEST.in file
  • review how reliable our current testing setup is; specifically, we had tests passing on the core side that were failing on the server side, because config.toml was present in the git-cloned repo but not present in the freshly installed package. This might be a good argument for switching to tox

Relevant literature:

NOTE: any decisions here should be reflected on the https://github.com/PrefectHQ/server side, too (cc: @joshmeek)

More strictly enforce what information is returned from a flow run

Currently, flow_runner.run() accepts a return_tasks argument. If provided, those tasks' States are returned as the data of the flow run's State. If the value is None (the default), then the terminal tasks' States are returned.

For consistency and to reduce noise, let's change the default behavior to return exactly what is provided as return_tasks. If it's none, then the flow State's data should just be an empty dict.

Parameter setting fails non-deterministically

Via @cicdw, the following code fails non-deterministically:

from prefect import *
@task()
def add(x, y):
    return x + y
with Flow('test') as f:
    y = Parameter('y')
    res = add(1, y)
f.run()
f.run().data
s = f.run()
s.all_states
s.all_states()
s.state
s.data
s = f.run(parameters=dict(y=56))
s
s.data
s.data[res.task]
s.data[res.task].data

Context updates for parameters appear to be failing in flow_runner.py, such that the Parameter task is unable to retrieve parameters from context and consequently fails.

Remove flow runner test utility from our unit tests

This test utility has some magic in it that can prevent us from properly testing certain features (e.g., return tasks) and should be avoided in our own unit tests. However, this is a useful utility that will likely be a user-facing feature and thus we should actually keep it and unit test it.

Flows don't properly restore previous context

Flows can be used as context managers:

with prefect.Flow('my flow') as f:
    ...

When this happens, the flow adds itself to prefect.context under the flow key. It stores a reference to the flow key's previous value so that it can restore that value when the context manager exits.

However, if the previous value was None (or if there was no flow key at all), then the flow restores flow=None to the previous context. Later, this creates weird behavior when other Prefect objects check to see if a flow is in context.

This issue was previously masked by the bug described in #16

Flow should always return data dictionary, even when everything fails

Flow().run() should always return a State object with a complete data dictionary attribute, even when the Flow fails early on. In this case, every Task should be represented with a FAILED State.

Example of empty dictionary:

from prefect import Parameter, Flow, task

@task
def add(x, y):
    return x + y

with Flow("test") as f:
    y = Parameter('y')
    res = add(1, y)

f.run().data # returns None, should return {<Task: add>: State(FAILED)}

Example of incomplete dictionary:

from prefect import Flow, task

@task
def err(x, y):
    raise ValueError("no sir")

@task
def add(x, y):
    return x + y

with Flow("test") as f:
    e = err(1, 2)
    a = add(e, 5)

f.run().data # returns {<Task: add>: State(FAILED)}, should return {<Task: err>: State(FAILED), <Task: add>: State(FAILED)}

Update trigger functions to use `TriggerFailed`

Once #44 / #59 are merged, trigger functions should raise TriggerFailed instead of Failed.

Moreover, triggers should probably never just return False -- originally that was done to allow triggers to indicate that they didn't have enough information to determine if they would run or not (for example, an all_success trigger that was passed 2 Success states and 1 Pending) but now that all states must be finished before being passed to a trigger, triggers should always be able to either pass or raise a signal.

Remove TaskResult class

TaskResults are pointers to (Task, Flow) tuples; in others words they refer to the result of a specific task in a specific flow.

If we make the current Flow explicit, then we can remove this class entirely because there will never be ambiguity about the Flow in question. For example:

with Flow() as flow:
    x = Task('x')
    y = Task('y')
    z = AddTask()(x=x, y=y)

assert x in flow.tasks
assert y in flow.tasks
assert z in flow.tasks

We can also create a global default Flow which would allow users to continue to use the current syntax. This might be helpful for quick tests. In order to submit to Prefect server, however, users would have to create a new flow object (with a proper name).

x = Task('x')
y = Task('y')
z = AddTask()(x=x, y=y)

assert x in get_global_flow().tasks
assert y in get_global_flow().tasks
assert z in get_global_flow().tasks

new_flow = get_global_flow().copy_as(name='my flow')

Supply context via local variables / context manager

Rather than pass context variables explicitly as kwargs, supply them as variables that are set via context manager

Task Definition (user code)

from prefect.context import as_of_date

class MyTask:
    def run(self):
        """ return True if this task's as-of-date is today """
       if as_of_date.date() == datetime.now().date():
           return True
        else:
            return False

TaskRunner (Prefect code)

# context manager sets the values of variables like as_of_date
with prefect.context.set_context(context):
   my_task.run()

Use dask.bag to easily access remote/local storage

dask.bag.read_text can load files from local storage, HDFS, S3, and GCS. Since it is already a required dependency (because of distributed) it's a good choice for remote Flow storage and configuration

Allow Flow States to be determined by tasks other than the terminal tasks

Currently, the State of a flow run is determined exclusively by that Flow's terminal tasks using this waterfall:

  • If any terminal task fails, the flow fails
  • If all terminal tasks succeed, the flow succeeds
  • If all terminal tasks finish, the flow succeeds (this is a separate waterfall step incase we decide that SKIPs - a valid "finished" state - should be handled differently
  • Otherwise the flow remains pending

However, we can imagine situations where this logic breaks down:

  • A flow consists of a task that does something (X) and a terminal clean up task that only runs if X fails.
  • If X succeeds, the clean up task will be skipped and the flow will be marked as successful (this is probably the correct behavior)
  • If X fails, the clean up task will run and be successful -- the flow will be marked as successful. This might be the wrong behavior; here are two arguments why:
    • The flow failed its intended purpose (which was to run X), and should be considered an overall failure.
    • The workflow ran properly -- catching an error and cleaning up -- and should be considered an overall success.

I propose allowing users to specify the tasks that should determine flow state. By default, it's the terminal tasks, but in the above example a user might have said that the X task should be used to determine flow state. No matter what tasks are chosen, the same waterfall described at the top of this issue would apply to those tasks. This would be a property of the Flow object, not a runtime configuration, and would be provided when the flow itself was created:

my_flow = prefect.Flow(name='demo flow', run_state_tasks = [X_task])

I think run_state_tasks is a bad name for this option -- open to suggestions.

Support *args for task run() methods

Currently, *args is explicitly not supported because edges that pass data are identified by string keys that become the **kwargs for the task's run() method. One way to support *args would be to automatically create edges with integer keys. When executing a flow, we would put edges with string keys into **kwargs and edges with integer keys into *args.

New State type: TRIGGER_FAILED

When a task fails, and the following task also fails because it has an all_successful trigger, it would be helpful to know that the second task didn't fail because of an error or execution problem, but that it failed because the upstream task failed.

We've discussed creating an UPSTREAM_FAILED state to represent this behavior, but it could create odd situations: supposing a task has an all_failed trigger and the upstream task succeeds. Should the resulting task really say UPSTREAM_FAILED?

I think we need a TRIGGER_FAILED which might act functionally like a SKIP but not be treated as a SUCCESS.

`start_tasks` kwarg doesn't work

The start_tasks kwarg appears to not be working:

from prefect import *

GLOBAL_DICT = {'key': 0}

class PrintTask(Task):
    def run(self):
        GLOBAL_DICT['key'] += 1
        return 1

class ReturnTask(Task):
    was_called = False
    def run(self, x):
        if self.was_called is False:
            self.was_called = True
            raise ValueError("error")
        return x

with Flow() as f:
    t1 = PrintTask()
    t2 = ReturnTask(max_retries=1)
    result = t2(t1())

state = f.run(return_tasks=[t2])
state = f.run(return_tasks=[t2], start_tasks=[t2])

## GLOBAL_DICT['key'] == 2

Add PickleEnvironment

For local/offline/simple testing, a PickleEnvironment would make flow serialization much simpler.

Functionally, it would simply serialize a flow using cloudpickle, and deserialize it on demand.

Explore transforming State into a class instead of a String + Data attribute

Instead of the current State class that has a string state and Any data attribute, as well as a bunch of is_finished() methods that check the state string against a list of valid strings, consider building a hierarchy of State classes:

# Base class

class State:
    # class attribute for example; set in __init__
    data = None

# -------------------------------------------
# Finished classes - Success, Failed, Skipped
class Finished(State):
    pass

class Success(Finished):
    pass

class Failed(Finished):
    # class attribute for example; set in __init__
    message = None

class Skipped(Finished):
    pass

# -------------------------------------------
# Pending classes - Retry and Scheduled (and Pending itself)
class Pending(State):
    pass

class Retry(Pending):
    # class attribute for example; set in __init__
    retry_time = None

class Scheduled(Pending):
    # class attribute for example; set in __init__
    scheduled_time = None

Then checking and working with states is easier than the current system of checking the string attribute and hoping the data attribute matches an expected schema:

s = Success(100)
f = Failed('Division by zero error')

assert isinstance(s, Finished)
assert isinstance(f, Finished)

r = Retry(datetime(2018, 12, 31))
assert isinstance(r, Pending)
assert isinstance(r.retry_time datetime)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.