Git Product home page Git Product logo

pydags's Introduction

pydags

pydags is a Python package to facilitate the creation and running of lightweight DAG-based workloads locally. Whereas technologies like Airflow, Kubeflow, and Luigi are more heavyweight, enterprise-level workflow managers, pydags is instead an extensible, simple, lightweight alternative tailored for local development and execution. There are no dependencies on Docker, Kubernetes, or any other such technologies, and it is use-case agnostic (unlike Kubeflow/Luigi).

Terminology

Stage

A Stage is synonymous to a node in a DAG.

Pipeline

A Pipeline is synonymous to a DAG, and is comprised of 1 or more Stages in a DAG-like structure.

Get Started

External Dependencies

pydags requires multiple non-Python dependencies in order to function properly. These include Redis (for internal operation) and GraphViz (for visualization). To install these, run the following command in terminal:

sudo apt-get install redis graphviz

Installing redis with this command on most *nix systems will result in the Redis server starting automatically. You can verify this by running the redis-server command, which should result in an Address already in use message, or similar.

Install pydags

To install pydags, simply run the following command:

pip install pydags

Example Usage

Expressing a DAG

Below is a simple example that aims to simply demonstrate how to specify DAGs in pydags. In this case, each stage of the pipeline is a Python function decorated with the @stage decorator.

from pydags.pipeline import Pipeline
from pydags.stage import stage


@stage
def stage_1():
    print('Running stage 1')

@stage
def stage_2():
    print('Running stage 2')

@stage
def stage_3():
    print('Running stage 3')

@stage
def stage_4():
    print('Running stage 4')

@stage
def stage_5():
    print('Running stage 5')

@stage
def stage_6():
    print('Running stage 6')

def build_pipeline():
    stage1 = stage_1()
    stage2 = stage_2()
    stage3 = stage_3().after(stage2)
    stage4 = stage_4().after(stage2)
    stage5 = stage_5().after(stage1)
    stage6 = stage_6().after(stage3).after(stage4).after(stage5)

    pipeline = Pipeline()

    pipeline.add_stages([
        stage1, stage2, stage3,
        stage4, stage5, stage6
    ])
    
    return pipeline


pipeline = build_pipeline()

pipeline.visualize()

pipeline.start()

Stages of the pipeline that can be run in parallel (in the above case, stages 1 and 2, and stages 3, 4, 5) will only be run in parallel if you set the num_cores argument of the .start() method to a positive integer (representing the number of cores to distribute computation across). For example, if you want to parallelize the execution of such nodes that can be run in parallel, then simply replace pipeline.start() with pipeline.start(num_cores=8) (to use 8 cores).

A Simple ML Pipeline

Below is an example of a simple ML pipeline consisting of 3 stages: 1) data download, 2) preprocessing, 3) model training. All 3 stages are subclasses of RedisStage, and thus inherit the functionality to read from and write to a Redis server. The data is thus passed between the stages using Redis. You may subclass DiskCacheStage, or implement your own cache/storage backend, if Redis is not suited to your use case.

Please note that you will need to install the additional scikit-learn dependency to run this example.

import redis
from sklearn.datasets import load_wine
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

from pydags.pipeline import Pipeline
from pydags.stage import RedisStage


class DataIngestor(RedisStage):
    @staticmethod
    def download_data():
        data = load_wine()
        features = data['data']
        targets = data['target']
        return features, targets

    def run(self, *args, **kwargs):
        features, targets = self.download_data()
        self.write('features', features)
        self.write('targets', targets)


class DataPreprocessor(RedisStage):
    @staticmethod
    def normalize(features):
        return MinMaxScaler().fit_transform(features)

    @staticmethod
    def split(features, targets):
        return train_test_split(features, targets, test_size=0.2)

    def run(self, *args, **kwargs):
        features = self.read('features')
        targets = self.read('targets')

        xtr, xte, ytr, yte = self.split(features, targets)

        xtr = self.normalize(xtr)
        xte = self.normalize(xte)

        data = {
            'xtr': xtr, 'xte': xte,
            'ytr': ytr, 'yte': yte
        }

        self.write('preprocessed_data', data)


class ModelTrainer(RedisStage):
    def __init__(self, *args, **kwargs):
        super(ModelTrainer, self).__init__(*args, **kwargs)

        self.model = None

    def train_model(self, xtr, ytr):
        self.model = RandomForestClassifier().fit(xtr, ytr)

    def test_model(self, xte, yte):
        acc = self.model.score(xte, yte)
        return acc

    def run(self, *args, **kwargs):
        preprocessed_data = self.read('preprocessed_data')

        xtr = preprocessed_data['xtr']
        xte = preprocessed_data['xte']
        ytr = preprocessed_data['ytr']
        yte = preprocessed_data['yte']

        self.train_model(xtr, ytr)

        acc = self.test_model(xte, yte)

        print('Accuracy:', acc)


def build_pipeline():
    redis_instance = redis.Redis(host='localhost', port=6379, db=0)

    data_ingestor = DataIngestor(redis_instance=redis_instance)

    data_preprocessor = DataPreprocessor(redis_instance=redis_instance).after(data_ingestor)

    model_trainer = ModelTrainer(redis_instance=redis_instance).after(data_preprocessor)

    pipeline = Pipeline()
    pipeline.add_stages([data_ingestor, data_preprocessor, model_trainer])

    return pipeline


p = build_pipeline()

p.visualize()

p.start()

pydags's People

Contributors

davidtorpey avatar orrymr avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.