Git Product home page Git Product logo

dagster-io / dagster Goto Github PK

View Code? Open in Web Editor NEW
10.3K 114.0 1.3K 955.69 MB

An orchestration platform for the development, production, and observation of data assets.

Home Page: https://dagster.io

License: Apache License 2.0

Makefile 0.02% Python 80.62% Jupyter Notebook 0.62% Shell 0.02% HTML 0.01% TypeScript 18.24% JavaScript 0.22% Dockerfile 0.07% Mako 0.01% CSS 0.01% Smarty 0.10% Mustache 0.06% Jinja 0.02%
data-pipelines dagster workflow data-science workflow-automation python scheduler data-orchestrator etl analytics

dagster's Introduction

Dagster is a cloud-native data pipeline orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.

It is designed for developing and maintaining data assets, such as tables, data sets, machine learning models, and reports.

With Dagster, you declare—as Python functions—the data assets that you want to build. Dagster then helps you run your functions at the right time and keep your assets up-to-date.

Here is an example of a graph of three assets defined in Python:

from dagster import asset
from pandas import DataFrame, read_html, get_dummies
from sklearn.linear_model import LinearRegression

@asset
def country_populations() -> DataFrame:
    df = read_html("https://tinyurl.com/mry64ebh")[0]
    df.columns = ["country", "pop2022", "pop2023", "change", "continent", "region"]
    df["change"] = df["change"].str.rstrip("%").str.replace("−", "-").astype("float")
    return df

@asset
def continent_change_model(country_populations: DataFrame) -> LinearRegression:
    data = country_populations.dropna(subset=["change"])
    return LinearRegression().fit(get_dummies(data[["continent"]]), data["change"])

@asset
def continent_stats(country_populations: DataFrame, continent_change_model: LinearRegression) -> DataFrame:
    result = country_populations.groupby("continent").sum()
    result["pop_change_factor"] = continent_change_model.coef_
    return result

The graph loaded into Dagster's web UI:

An example asset graph as rendered in the Dagster UI

Dagster is built to be used at every stage of the data development lifecycle - local development, unit tests, integration tests, staging environments, all the way up to production.

Special Event: Introducing Dagster+

Join us on April 17 (12PM ET) for a special event introducing Dagster+

Join us on April 17 (12PM ET) for a special event introducing Dagster+. Register for our Dagster+ launch event here

In addition to the core open-source project, we are excited to announce Dagster+, the next generation of Dagster's cloud offering.

  • Find out how to weave data reliability and quality checks into the execution of your data pipelines.
  • See how diff-based branch deployments will accelerate your development cycle and cut extraneous compute costs.
  • Get a deep understanding of what is driving the cost of your data pipelines, then optimize to get the best cost/performance outcomes.
  • Enjoy the benefits of built-in data cataloging, and asset-level rich metadata.

Quick Start:

If you're new to Dagster, we recommend reading about its core concepts or learning with the hands-on tutorial.

Dagster is available on PyPI and officially supports Python 3.8 through Python 3.12.

pip install dagster dagster-webserver

This installs two packages:

  • dagster: The core programming model.
  • dagster-webserver: The server that hosts Dagster's web UI for developing and operating Dagster jobs and assets.

Running on a Mac with an Apple silicon chip? Check the install details here.

Documentation

You can find the full Dagster documentation here, including the 'getting started' guide.


Key Features:

image

Dagster as a productivity platform

Identify the key assets you need to create using a declarative approach, or you can focus on running basic tasks. Embrace CI/CD best practices from the get-go: build reusable components, spot data quality issues, and flag bugs early.

Dagster as a robust orchestration engine

Put your pipelines into production with a robust multi-tenant, multi-tool engine that scales technically and organizationally.

Dagster as a unified control plane

Maintain control over your data as the complexity scales. Centralize your metadata in one tool with built-in observability, diagnostics, cataloging, and lineage. Spot any issues and identify performance improvement opportunities.


Master the Modern Data Stack with integrations

Dagster provides a growing library of integrations for today’s most popular data tools. Integrate with the tools you already use, and deploy to your infrastructure.


image

Community

Connect with thousands of other data practitioners building with Dagster. Share knowledge, get help, and contribute to the open-source project. To see featured material and upcoming events, check out our Dagster Community page.

Join our community here:

Contributing

For details on contributing or running the project for development, check out our contributing guide.

License

Dagster is Apache 2.0 licensed.

dagster's People

Contributors

alangenfeld avatar asingh16 avatar bengotow avatar benpankow avatar clairelin135 avatar cmpadden avatar dependabot[bot] avatar dpeng817 avatar elementl-devtools avatar erinkcochran87 avatar fishmanl avatar freiksenet avatar gibsondan avatar hellendag avatar helloworld avatar jamiedemaria avatar jmsanders avatar johannkm avatar mgasner avatar owenkephart avatar prha avatar ramshackle-jamathon avatar rexledesma avatar salazarm avatar schrockn avatar sidkmenon avatar smackesey avatar sryza avatar tacastillo avatar yuhan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dagster's Issues

Materialization issues

  • As I understood, materialization definitions are implicit. So execution params specify materializations for solids. Currently drivers don't expose supported materializations, so we should have a way to extract that metadata from a driver. Otherwise it's really poorly discoverable that CSV is a materialization type.
  • I guess materialization should also be part of config/env yaml file for the pipeline. Another option is to have it be defined separately and then selectively enabled (eg by saying --materialization NAME or --materialization TAG).

Bug in sql_hello_world example

  File "../../bin/dagster_driver.py", line 9, in <module>
    dagster.cli.dagster_cli(obj={})
  File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 722, in __call__
    return self.main(*args, **kwargs)
  File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 697, in main
    rv = self.invoke(ctx)
  File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 895, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/decorators.py", line 64, in new_func
    return ctx.invoke(f, obj, *args[1:], **kwargs)
  File "/home/freiksenet/Work/elemental/venv/lib/python3.6/site-packages/click/core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "/home/freiksenet/Work/elemental/dagster/dagster/cli/__init__.py", line 108, in list_command
    pipeline_configs = config.create_pipelines()
  File "/home/freiksenet/Work/elemental/dagster/dagster/cli/__init__.py", line 52, in create_pipelines
    pipeline_config.create_pipeline()
  File "/home/freiksenet/Work/elemental/dagster/dagster/cli/__init__.py", line 79, in create_pipeline
    self.pipeline = self.fn()
  File "/home/freiksenet/Work/elemental/dagster/dagster/dagster_examples/sql_hello_world/pipeline.py", line 32, in define_pipeline
    sum_table_solid = dagster_sa.create_sql_solid(
AttributeError: module 'dagster.sqlalchemy_kernel' has no attribute 'create_sql_solid'

Create UX standards for the CLI tool

I propose

dagster <noun> <verb> <target>

e.g.

dagster pipeline graphviz pipeline_name
dagster pipeline execute pipeline_name
dagster pipeline list-all

etc

Rename pipeline function in cli/__init__.py

This causes an entire rash of lints errors because it collides with the pipeline variable name used throughout the file. Python is annoying and doesn't have a namespace for variables versus functions so we have to deal with this crap all the time. This is compounded by the fact that libraries like click love to automatically name things based on function/arg names etc. So often you have to workaround this with odd code.

Release the repo publicly

It'd be great to be able to

  1. point people to a public-facing repo and
  2. include dagster in requirements.txt files.

I don't know what needs to be done vis a vis organizations, ownership, etc. to make that possible.

Make naming consistent for definitions

We need to do a pass on naming definitions. At minimum inputs and outputs should consistently by input_defs and output_defs. This is mostly because python is dumb and has "input" as a keyword which no one ever uses. So you are either stuff with input_ or input_def or similar. Probably all of these things (solids, arguments, contexts) should be suffixed with def by default but I do still react negatively to that.

Enable optional arguments

This will likely involve introducing ArgumentDefinitions and also a more formal type system (with non-null just like GraphQL)

Driven by the need to make log_level optional for the default context args.

Implement dagster execute

I commented out dagster execute as it currently stands.

We should drive it from environment files as proposed at https://github.com/superconductive/clarify-data-warehouse-201803/tree/hello_dagster_introduce_command_line/clarify_kickoff/clarify_kickoff

Copying and pasting here:

Pipeline Execution and Configuration

The next obvious thing to do will be be able to execute the pipeline. We'll introduce a yaml config file in order to do this. Each yaml file will represent an environment. An environment is effectively the set of all parameters into the pipeline. This way you can have an environment for testing, staging, production, and so forth. Each enviroment lives in its own file. (We could also
support multiple enviroments per file. Open to feedback but there are tradeoffs there)

Say there is a file: lds_backfill_prod.yml

environment:
  inputs:
    - name: s3
      args:
        bucket: some_bucket 
        access_key: some_key
        secret_key: some_secret_key 

There will be an execution command of the form:

dagster pipeline execute lds_backfill --environment=lds_backfill_prod.yml

You'll also be able to do fun stuff like execute subsets of the DAG.

dagster pipeline execute lds_backfill --environment=lds_backfill_prod.yml --from upload_files_to_s3

The above command would execute all solids from upload_files_to_s3 and onward. So upload_files_to_s3 --> create_lds_table_solid --> copy_data_to_lds_table --> populate_abstract_from_lds_table in this example (as of 06/04/2018)

Eliminate global state from logging

Logging is still a bit of a mess. The "colored logs" module installs a logger that is global, I believe, so that it persists between different execution context creations. This can cause a lot of confusion (for me at least).

It would be fantastic to have a new general model for dealing with python's BS, global-state-polluting, java-naming-conventioned, logging API

Consolidate and build high quality command line tooling (master task)

The command line tooling in dagster is a mess. This should be consolidated into a single, sensible command that provides a lot of value.

Initially I built this that you treated a file that contained the definition of a pipeline as a command line tool. This was done by conditionally invoking the pipeline if it was main. This was a mistake. We want to be able to invoke a pipeline from a traditional feeling command line tool.

embedded_cli.py is the code that drives this so-called "embedded" cli.

To see an example of this:

run

python3 pipelines.py in dagster/dagster_examples

That is the multi-pipeline case.

Or, for example

python3 pipeline.py in dagster/dagster_examples/qhp for example

That is the single-pipeline case

This should be sub-divided into a number of tasks:

  • File layout for making dagster pipeline tool aware of pipelines. This is essentially already implemented. dagster.cli.init.py:12-25
  • Listing pipelines and their description (also these lines, but should be improved)
  • A good way of printing out pipeline metadata. Currently print_pipeline in embedded_cli.py. It is not good. Invoked using the "meta" command in the single pipeline case.
  • Consume a yaml file that constructs the objects defined in dagster/config.py. We want to be able to drive a pipeline execution or materialization from a yaml file.
  • Be able to codegen a yaml file based on the declared definition of a pipeline.
  • Be able to verify the validity of an incoming yaml file based on a definition of a pipeline (not hi-pri)

CI/CD

Set up CI/CD pipeline. Decide on Travis/Circle/etc and then implement

Make missing solid error better

STR

  1. make a solid have a dependency on a solid not listed in the pipeline instantiation.
  2. Run and watch for this:
dagster.check.CheckError: Invariant failed. Description: dep must exist got: <<your_missing_solid>> and set {'<<solids>>'}

Eliminate pass_pipeline in the cli

This was always just a concession to the odd embedded_cli construct. Since we are getting rid the embedded cli, let's get rid of the this. Although @freiksenet impressed you found make_pass_decorator in click. Very clever.

New result API

The existing result API that is returned from execute_pipeline is no longer usable at all. We need a new one which is aware of the fact that each solid can emit multiple results.

Fix dagit panning and zooming

Panning and zooming around dagit is a pretty awful user experience at the moment. It is too sensitive to movement and the diagram moves around as you zoom in. Too frequently it zooms off the screen entirely. This does need to be perfect but it needs some tweaking in the short term. People are using this to debug real things now and it is a real usability issue.

Generate multiple dagster Expectations from a great expectations json file

Right now in the clarify repo we have a single expectation that evaluates the entire set of expectations in a ge json file. What we really want to do is expose each individual expectation in that file as its own ExpectationDefinition, so that it is inspectable by the metadata systems and configurable by policy, etc.

Solid input requirements

To test --from parameter, I've added an intermediate input to one of the solids in pandas_hello_world as follows:

    def sum_transform_fn(num_csv):
        sum_df = num_csv.copy()
        sum_df['sum'] = sum_df['num1'] + sum_df['num2']
        return sum_df

    sum_solid = dagster_pd.dataframe_solid(
        name='sum', inputs=[dagster_pd.csv_input('num_csv')], transform_fn=sum_transform_fn
    )

    def sum_sq_transform_fn(sum):
        sum_sq = sum.copy()
        sum_sq['sum_sq'] = sum['sum']**2
        return sum_sq

    sum_sq_solid = dagster_pd.dataframe_solid(
        name='sum_sq',
        inputs=[
            dagster_pd.csv_input('sum_csv'),
            dagster_pd.depends_on(sum_solid),
        ],
        transform_fn=sum_sq_transform_fn
    )

When I try to run it, it complains about one of the inputs not being defined. I'm not sure that's an expected behavior, because I though only one input is needed per solid. I'm not sure how to run intermediate solids if one can't specify the results of intermediate computations as alternative input.

Lazy loading pipelines

While the way I implemented it is broken, I think we do need lazy loading for pipelines in cli.

If you have several pipelines and one of them is broken, it should still be possible to list pipelines and to run the non-broken ones. Currently it's not possible, because we force all pipelines to get the name. Should the metadata like name be available in a way that you can get it without actually creating the pipeline?

In addition, I am prematurely optimizing expecting some pipelines to load slowly.

Enabling multiple instances of an output (ends up being map reduce)

We need to enable multiple instances of an output. This will allow a "streaming batch" semantic in the pipeline that spawns multiple copies of a downstream DAG. There are a lot of challenges to solve here as you would also want a way to coalesce multiple outstanding subdags. This is essentially map-reduce.

Detect mismatch between input definitions and transform function signature

Right now the system mandates that the signature of the transform function matches the input definitions. However the error messages suck and only kick in at runtime. We could do a much nicer job and detect the problem upfront and raise the error at definition construction time using reflection.

Separate Pandas Dataframe Solid into Two Sources

Right now because of the first architecture the pandas dataframe_solid has a single source which is called "CSVORPARQUET" and takes a format argument. This should be separated into two distinct sources, one Csv, one Parquet

Solid Registry and config-driven solid definition

Right now we can "inject" solids into a sub_pipelne using PipelineDefinition.create_single_solid_pipeline and PipelineDefinition.create_sub_pipeline.

However we want to be able to do this with via config. In order to do this we will need to be able construct solids with only a string as a key. This will necessitate the ability to register solids and look them up by string and parameterize them. You can think of these as "higher order solids" in a way. The libraries need to declare a way to create solids, and then config can be set up to invoke those higher order solids to produce solids. This sounds complicate but when seen in practice it will be quite intuitive.

We want to be able specify something like

injected_solids:
    solid_name:
        input_name:
           solid: load_pandas_df_from_csv
           args:
              delimiter: "|"

This config would dynamically create a solid which is injected into the pipeline.

Encourage the use of explicit input names when specifying dependencies

For inputs that are dependencies, right now the APIs strongly encourage/enforce that the input name and the solid name should be the same. That's a mistake, I believe. It can cause for very awkward names for transform function arguments. More importantly, someone changing the name of the solid would break any solid that depends on that name.

Best example is dep_only_input. This should take a name.

We also might want to consider an API that instead passes in a dictionary of input specifications where the name is the key. This would enforce uniqueness, etc.

Write JSON File Handler well and write tests

Right now in dagster there is the json-file-handler branch. This has been developed in tandem against the add-logging-to-json-file branch in the superconductive repo. That branch works as is, but I would like the implementation in json-file-handler to be higher quality before merging back into develop.

Make Dagster Python 2.7 & 3.5 compatible

We have avoided using "big" python 3 only features -- such as type hints and async await. However, it is not tested against 2.7 and there are more trivial things such as f-strings, and required keyword arguments.

We should set up tox to test against 2.7 and make this happen.

Prefer more verbose syntax for building click command line groups

I'd like to switch back to the old form of building the command line interface. How the main cli is getting constructed is a little obtuse at this point. I strongly prefer code such as:

    dagster_command_group = click.Group(name='dagster')
    dagster_command_group.add_command(embedded_dagster_multi_pipeline_graphviz_command)
    dagster_command_group.add_command(embedded_dagster_multi_pipeline_pipelines_command)
    dagster_command_group.add_command(embedded_dagster_multi_pipeline_output_command)
    dagster_command_group.add_command(embedded_dagster_multi_pipeline_execute_command)

Although inelegant, it is super obvious what is going on and I can immediately jump to the command in question by finding the symbol within vscode by clicking on it. Let's just be dead obvious here.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.