Git Product home page Git Product logo

mssql_airflow's Introduction

Alternative Microsoft SQL Server Hook for Airflow

A minimal Airflow Hook for interacting with Microsoft SQL Server

Enables the usage of DbApiHook methods that the provided Hook for SQL Server does not support, such as .get_sqlalchemy_engine and .get_pandas_df.

Install

python -m pip install git+https://github.com/Harduim/mssql_airflow.git

Features

  • Use SQLAlchemy Connections with MsSQLHook.get_sqlalchemy_engine
  • Get a pandas dataframe from a query using MsSQLHook.get_pandas_df
  • Multiline inserts with MsSQLHook.batch_insert_rows
  • All other methods already implemented by DbApiHook

Instructions and sample usage

  • Create a connection on Admin => Conections
    • Conn Id: Name of the conection, used on the parameter mssql_conn_id
    • Conn Type: Microsoft SQL Server
    • Host: The IP address or hostname of the server
    • Schema: The Database not actual schema. Not sure why there is no "database" field, I'm just following Airflow's convention
    • Password: The password
    • Login: The user name. Use 'domain\username' for Windows auth.

Airflow 1.10

from datetime import datetime, timedelta

from airflow import DAG
from airflow.hooks.alternative_mssql_hook import MsSQLHook
from airflow.operators.python_operator import PythonOperator


def sample_usage():
    # Schema is the database, not the actual schema.
    mssql = MsSQLHook(mssql_conn_id="my_conn", schema="some_database")

    # This method (get_pandas_df) does not work with the regular mssql plugin
    df = mssql.get_pandas_df("SELECT * FROM TABLE")

    # All the regular dbapihook methods works
    my_records = mssql.get_records("SELECT col1, col2 FROM THE_TABLE")
    mssql.run("DELETE FROM othet_staging_table_name")

    # Saving data to a staging table using pandas to_sql
    conn = mssql.get_sqlalchemy_engine()
    df.to_sql("staging_table_name", con=conn, if_exists="replace")


with DAG(
    "Sample_DAG",
    description="""Sample usage of the MSSQL plugin""",
    schedule_interval="00 00 * * *",
    default_args={
        "owner": "Arthur Harduim",
        "depends_on_past": False,
        "start_date": datetime(2020, 12, 1),
        "email": ["[email protected]"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 2,
        "retry_delay": timedelta(minutes=6),
    },
    catchup=False,
) as dag:
    some_db_tsk = PythonOperator(task_id="some_db_tsk", python_callable=sample_usage)

Airflow 2.0

from datetime import datetime, timedelta

from airflow import DAG

from alternative_mssql_hook import MsSQLHook
from airflow.operators.python import PythonOperator


def sample_usage():
    # Schema is the database, not the actual schema.
    mssql = MsSQLHook(mssql_conn_id="my_conn", schema="some_database")

    # This method (get_pandas_df) does not work with the regular mssql plugin
    df = mssql.get_pandas_df("SELECT * FROM TABLE")

    # All the default dbapihook methods works
    my_records = mssql.get_records("SELECT col1, col2 FROM THE_TABLE")
    mssql.run("DELETE FROM othet_staging_table_name")

    # Saving data to a staging table using pandas to_sql
    conn = mssql.get_sqlalchemy_engine()
    df.to_sql("staging_table_name", con=conn, if_exists="replace")


with DAG(
    "Sample_DAG",
    description="""Sample usage of the MSSQL plugin""",
    schedule_interval="00 00 * * *",
    default_args={
        "owner": "Arthur Harduim",
        "depends_on_past": False,
        "start_date": datetime(2020, 12, 1),
        "email": ["[email protected]"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 2,
        "retry_delay": timedelta(minutes=6),
    },
    catchup=False,
) as dag:
    some_db_tsk = PythonOperator(task_id="some_db_tsk", python_callable=sample_usage)

mssql_airflow's People

Contributors

harduim avatar

Stargazers

 avatar  avatar  avatar

Watchers

 avatar  avatar

Forkers

sls-mdr

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.