Git Product home page Git Product logo

luigi-warehouse's Introduction

Luigi-Warehouse

A boilerplate implementation of Luigi at Groupon

pic

  • Luigi is a Python package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more

  • Luigi-Warehouse adds

  • example workflows (i.e. replicating postgresql tables to redshift)

  • more data sources

  • variable data sources that do not rely on default luigi behavior/configs (i.e. VariableS3Client)

Install / Setup

  • Install python3 - This repo has been tested against python 3.4+
Simple

python setup.py install

Developers - if you're wanting to modify/use the workflows with your custom logic
  • Clone this repo
  • pip3 install -r requirements.txt if you want full functionality of all data sources
Post-Install
  • mkdir your-path-to/data
  • Put your credentials and settings in luigi.cfg. luigi.cfg-example shows some possible options. You can also $ export LUIGI_CONFIG_PATH=/path/to/your/luigi.cfg && python...
  • You're ready to replicate or move data around...

Getting Started

  • Some example workflows are included. Assumptions, Args & Comments are in the File
File Description Main Class(es)
gsheet_to_redshift.py replicates all data from a google sheet to a redshift table (full copy/replace) Run
gsheet_to_hadoop.py replicates all data from a google sheet to a hadoop hive table via spark (full copy/replace) main
postgres_to_redshift.py replicates postgres tables to redshift (incrementally or full copy/replace) Run - PerformIncrementalImport PerformFullImport
postgres_to_hadoop.py spark app that replicates postgres tables to hadoop(hive) (incrementally or copy/replace) Run - RunIncremental RunFromScratch
salesforce_to_redshift.py replicates a salesforce report or SOQL to a redshift table(full copy/replace) SOQLtoRedshift ReporttoRedshift
teradata_to_redshift.py replicates given teradata SQL to redshift table (incrementally or full copy/replace) Run
typeform_to_redshift.py replicates all data from typeform responses to a redshift table (full copy/replace) Run
zendesk_to_redshift.py extracts users,orgs,tickets,ticket_events from zendesk to redshift (partially incremental) Run
zendesk_to_hadoop.py generic class to extract from zendesk API and load to hadoop hive via spark (incrementally or full copy/replace) ZendeskSpark
  • Example to start the luigi scheduler daemon
$ ./start_luigi_server.bash
  • Example to run a workflow with multiple workers in parallel
$ LUIGI_CONFIG_PATH=/path/to/your/luigi.cfg && python3 luigi_warehouse/postgres_to_redshift.py Run --params here --workers 50

Data Sources

Dependent python packages required & API reference

Postgres / Redshift - psycopg2
MySQL - pymysql
Googlesheets - gspread : API Reference
Livechat - API Reference
Zendesk - zdesk : API Reference
Shiftplanning - API Reference
Kochava - API Reference
Teradata - teradata
  • requires some configuring to install. We typically have to do
$ mv ~/.odbc.ini ~/.odbc.ini.orig 
$ cp /opt/teradata/client/15.10/odbc_64/odbcinst.ini ~/.odbcinst.ini 
$ cp /opt/teradata/client/15.10/odbc_64/odbc.ini ~/.odbc.ini
OnboardIQ - API Reference
AppBoy - API Reference
  • Props to cghall for the capability to query salesforce reports directly using the analytics API

  • Also available are SalesforceBulk and SalesforceBulkJob classes which use the Salesforce bulk API

Braintree - braintree : API Reference
Typeform - API Reference
Checkr - API Reference
AWS - boto : boto3

Notifications

  • We currently use slack or email for job status notifications which can easily be added

  • luigi-slack

from luigi_slack import SlackBot, notify
slack_channel = 'luigi-status-messages'
...
...
...

if __name__ == '__main__':
  slack_channel = 'luigi-status-messages'
  slacker = SlackBot(token=luigi.configuration.get_config().get('slackbots', 'BOWSER_SLACK_API_KEY'),
                   channels=[slack_channel])
  with notify(slacker):
    luigi.run() 
import boto3

class Email:
  def __init__(self, region, aws_access_key_id, aws_secret_access_key):
    self.client = boto3.client('ses',region_name=region,aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

  def send(self, from_, to_list, subject, body):
    return self.client.send_email(Source=from_,
                                  Destination={'ToAddresses': to_list},
                                  Message={'Subject':
                                                     {'Data': subject},
                                           'Body':
                                                     {'Text':
                                                             {'Data': body},
                                                      'Html':
                                                              {'Data':' '}
                                                      }
                                            }
                                   )

Data Validation

  • Targeted towards ensuring successful replication of data to Redshift (see modules/validation.py)
Structure
  • if the same number of columns in the csv are in the target table
  • if the columns have the same datatypes in the same order (VARCHAR is acceptable for any python datatype)
    • uses python_redshift_dtypes to convert
LoadError
  • Checks for load errors for the target:schema:table provided since the load_start provided timestamp
RunAnywayTarget
  • Use the wrapper class RunAnywayTarget if you want to make it easier as we make each validation scheme better

  • pass in the taskobj with the following attributes

    • type = ['LoadError', 'Structure']
    • target = Redshift
    • table =
    • schema =
    • local_file = local csv file path
    • load_start = when you started to copy the records from S3
  • doing RunAnywayTarget(self).done() will not do validation

  • doing RunAnywayTarget(self).validation() will do the validation and if successful also say we're done the task

OrderedDF
  • Takes the following args
  1. target_cols : a list of columns ordered for how you want your dataframe to be structured
  2. df : your dataframe you want restructured
  • example: I my dataframe to have columns in this order ['one','two','three','four','five','six']
>>> from validation import OrderedDF
>>> import pandas as pd
>>> test = [[None,'',1,7,8],[None,'',2,5,6]]
>>> test = pd.DataFrame(test,columns=['one','two','four','five','three'])
>>> test
    one two  four  five  three
0  None         1     7      8
1  None         2     5      6
>>> result = OrderedDF(['one','two','three','four','five','six'],t)
>>> result.df
    one two  three  four  five   six
0  None          8     1     7  None
1  None          6     2     5  None
StructureDynamic
  • This class will fix tables for you
  1. Check for copy errors
  2. Handle the copy errors
  • Add column(s) if needed
  • Change dtype(s) if needed
  1. Get orig table's schema
  2. Craft new table's schema with changes from errors
  3. Make the change and retry the copy and remove duplicate * records
  4. While there are copy errors
  • handle the errors

  • attempt to fix

  • retry copy

  • remove duplicate * records

  • To run use

StructureDynamic(target_schema=  ,# redshift schema your table is in
                 target_table=    # your table
                 )
                 .run(
                      add_cols=  ,# True or False for if you want columns added in attempting to fix
                      change_dtypes=  ,# True or False if you want column data types changed in attempting to fix
                      copy=           ,# copy command you attempted
                      load_start=      # when you started the copy command, '%Y-%m-%d %H:%M:$S
                      )
  • Example usage:
    • sql prep: create the table
CREATE TABLE public.test(id INT, col VARCHAR);
INSERT INTO test VALUES (1,'2');
INSERT INTO test VALUES (2, 'two');
  • test.csv: create the csv you want to attempt to copy
1,2
two,2
3,4
5,6
ab,test
  • we attempt to copy normally but we get load errors because one of the columns isn't right
COPY public.test FROM 's3://luigi-godata/test.csv' 
CREDENTIALS 'aws_access_key_id=XXXX;aws_secret_access_key=XXXX'
CSV DELIMITER ',' COMPUPDATE ON MAXERROR 0;
  • we run ValidationDynamic
from validation import StructureDynamic
copy = '''COPY public.test FROM 's3://luigi-godata/test.csv' 
          CREDENTIALS 'aws_access_key_id=XXXX;aws_secret_access_key=XXXX'
          CSV DELIMITER ',' COMPUPDATE ON MAXERROR 0;'''
StructureDynamic(target_schema='public',target_table='test').run(add_cols=True,change_dtypes=True,copy=copy,load_start='2016-10-6 10:15:00')
  • our table is fixed and called public.test
  • our original table is kept as public.test_orig_backup
  • stdout lists the stl_load_errors
  • the changes made to the table's ddl is printed to stdout

luigi-warehouse's People

Contributors

benyuel avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

luigi-warehouse's Issues

Feedback on Luigi host_resources

Hi there, (@Benyuel maybe?)

it seems you are using a version of that feature in Luigi: spotify/luigi#1669
If so, do you have any feedback on running it in production?

I would be very interested that this feature be merged to luigi, so any remarks is welcome!

ps Btw, sorry for the fake issue, I thought it would be the most straightforward way to contact you, so feel free to close it.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.