Git Product home page Git Product logo

data-pipelines-with-apache-airflow's Introduction

Data Pipelines with Apache Airflow

Code accompanying the Manning book Data Pipelines with Apache Airflow.

Structure

Overall, this repository is structured as follows:

├── chapter01                # Code examples for Chapter 1.
├── chapter02                # Code examples for Chapter 2.
├── ...
├── .pre-commit-config.yaml  # Pre-commit config for the CI.
├── CHANGELOG.md             # Changelog detailing updates to the code.
├── LICENSE                  # Code license.
├── README.md                # This readme.
└── requirements.txt         # CI dependencies.

The chapterXX directories contain the code examples for each specific Chapter.

Code for each Chapter is generally structured something like follows:

├── dags                  # Airflow DAG examples (+ other code).
├── docker-compose.yml    # Docker-compose file used for running the Chapter's containers.
└── readme.md             # Readme with Chapter-specific details, if any.

Usage

Details for running specific chapter examples are available in the corresponding chapter's readme. In general, most code examples are run using docker-compose, together with the provided docker-compose.yml file in each chapter. This docker-compose file will take care of spinning up the required resources and start an Airflow instance for you. Once everything is running, you should be able to run the examples in Airflow using your local browser.

Some later Chapters (such as Chapters 11 and 13) may require a bit more setup. The details for doing so are described in the corresponding readme's and in the Chapter's themselves.

data-pipelines-with-apache-airflow's People

Contributors

basph avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

data-pipelines-with-apache-airflow's Issues

Chapter 14 citibike_api and nyc_transportation

The citibike_api_1 fails to run , showing an error ` File "app.py", line 6, in

from flask import Flask, jsonify, Response

File "/usr/local/lib/python3.7/site-packages/flask/init.py", line 14, in

from jinja2 import escape`

Same thing goes for the nyc_transportation api.
Also After running and opening the web servers the DAGs do not show up even though they are visible and could be opened inside the file structure

Error when cloning repository on macos

git clone https://github.com/BasPH/data-pipelines-with-apache-airflow.git
Cloning into 'data-pipelines-with-apache-airflow'...
remote: Enumerating objects: 1492, done.
remote: Counting objects: 100% (2/2), done.
remote: Compressing objects: 100% (2/2), done.
remote: Total 1492 (delta 1), reused 0 (delta 0), pack-reused 1490
Receiving objects: 100% (1492/1492), 267.00 KiB | 223.00 KiB/s, done.
Resolving deltas: 100% (751/751), done.
warning: the following paths have collided (e.g. case-sensitive paths
on a case-insensitive filesystem) and only one from the same
colliding group is in the working tree:

  'chapter07/README.md'
  'chapter07/readme.md'

[Error] chapter 14 docker file do not work properly on M1 Macbook

On Macbook Pro M1 Max, after running docker-compose up -d, the following error occured on port 8083 and 8082.

Traceback (most recent call last):

  File "app.py", line 4, in <module>

    from flask import Flask, render_template

  File "/usr/local/lib/python3.7/site-packages/flask/__init__.py", line 14, in <module>

    from jinja2 import escape

ImportError: cannot import name 'escape' from 'jinja2' (/usr/local/lib/python3.7/site-packages/jinja2/__init__.py)

Traceback (most recent call last):

  File "app.py", line 4, in <module>

    from flask import Flask, render_template

  File "/usr/local/lib/python3.7/site-packages/flask/__init__.py", line 14, in <module>

    from jinja2 import escape

ImportError: cannot import name 'escape' from 'jinja2' (/usr/local/lib/python3.7/site-packages/jinja2/__init__.py)

Also, 'minio_init-1' and 'initdb-adduser-1' are remained as 'EXITED(0)'.

Chapter 2 script listing_2_10.sh error missing \

There is a mistake in script in chapter02/scripts/listing_2_10.sh : lack of '\' special symbol after command --name airflow in line 15.
Execution of script ends with error: "docker run" requires at least 1 argument.
Line 15 should be --name airflow \

Chapter 2 Rocket Launch DAG

Running the tutorial on Digital Ocean Droplet had to make the following edits to the code to get it to run:

  1. Update Import Statements to :
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python_operator import PythonOperator

  2. Update the curl statement in the bash command passed to the BashOperator:
    bash_command = "curl -Lk -o /tmp/launches.json 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'"

chapter 4: File overwrite and idempotency

The ELT process presented in the chapter 4 leads to an expected results if mulitple dag runs are executed in a short window or concurrently, which happens during a backfill or when the max dag runs param > 1. All the dag runs are writing to the sampe files /tmp/wikipageviews.gz and /tmp/wikipageviews.sql rather than files specific to the execution date. The same process also will produce duplicated results if pipeline is run multiple times or restarted.

Chapter 3 manning-airflow/events_api Docker image problem

Traceback (most recent call last):
File "/app.py", line 8, in
from flask import Flask, jsonify, request
File "/usr/local/lib/python3.8/site-packages/flask/init.py", line 14, in
from jinja2 import escape
ImportError: cannot import name 'escape' from 'jinja2' (/usr/local/lib/python3.8/site-packages/jinja2/init.py)

Request to add details for mocking metastore variables

Hi team, the request is to add best practice(s) around mocking metastore variables for use in unit tests. I've been able to pull together the following code in my code but curious if you have other/better approaches for mocking out Variables for use in pytest or other unit test frameworks. Thanks.

from airflow.models.variable import Variable

def test_dummy(monkeypatch):
  def mock_get(*args, **kwargs):
    mocked_dict = { "dummy_key1":"dummy_val1","dummy_key2":"dummy_val2"}  
    return mocked_dict.get(args[0])

  monkeypatch.setattr(Variable,"get",mock_get)

  dummy_var = Variable.get("dummy_key1")

airflow 2.2.2 `context["execution_date"]` deprecated

start = context["execution_date"]
end = context["next_execution_date"]

airflow==2.2.2 shows

DeprecationWarning: Accessing 'execution_date' from the template is deprecated and will be removed in a future version. Please use 'logical_date' or 'data_interval_start' instead. 

and

DeprecationWarning: Accessing 'next_execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end' instead.

/airflow/dags/ does not exist

In Chapter 2, when i try to copy the DAG python file to the path ~/airflow/dags/ , I get a response which says the recipient path does not exist.

Can you please share how to arrive at the correct file path. I installed airflow using pip

chapter 7 has two readme.md files

chapter 7 directory has two readme files: README.md and readme.md. This caused git to give the warning about colliding paths:

warning: the following paths have collided (e.g. case-sensitive paths
on a case-insensitive filesystem) and only one from the same
colliding group is in the working tree:

  'chapter07/README.md'
  'chapter07/readme.md'

These two files seem to have the same content, can we delete one of them?

bug: chapter06/scripts/trigger_dag.sh

The dag id is named as listing_6_08 in chapter06/dags/listing_6_8.py, so the chapter06/scripts/trigger_dag.sh should follow the dag id.

From:

#!/usr/bin/env bash

# Trigger DAG with Airflow CLI
airflow dags trigger listing_6_8 --conf '{"supermarket": 1}'

# Trigger DAG with Airflow REST API
curl -X POST "http://localhost:8080/api/v1/dags/listing_6_8/dagRuns" -H  "Content-Type: application/json" -d '{"conf": {"supermarket": 1}}' --user "admin:admin"

To:

#!/usr/bin/env bash

# Trigger DAG with Airflow CLI
airflow dags trigger listing_6_08 --conf '{"supermarket": 1}'

# Trigger DAG with Airflow REST API
curl -X POST "http://localhost:8080/api/v1/dags/listing_6_08/dagRuns" -H  "Content-Type: application/json" -d '{"conf": {"supermarket": 1}}' --user "admin:admin"

Chapter 8 pytest ModuleNotFoundError errors

If I run pytest from data-pipelines-with-apache-airflow/chapters/chapter8 it encounters several ModuleNotFoundError errors:

pytest
===================================================================================== test session starts =====================================================================================
platform darwin -- Python 3.8.2, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /Users/ejstembler/Projects/data-pipelines-with-apache-airflow/chapters/chapter8
plugins: pytest_docker_tools-0.2.2, helpers-0.1.0, helpers-namespace-2019.1.8, mock-3.3.1
collected 1 item / 5 errors                                                                                                                                                                   

=========================================================================================== ERRORS ============================================================================================
__________________________________________________________ ERROR collecting tests/airflowbook/operators/test_json_to_csv_operator.py __________________________________________________________
ImportError while importing test module '/Users/ejstembler/Projects/data-pipelines-with-apache-airflow/chapters/chapter8/tests/airflowbook/operators/test_json_to_csv_operator.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/__init__.py:127: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
tests/airflowbook/operators/test_json_to_csv_operator.py:5: in <module>
    from airflowbook.operators.json_to_csv_operator import JsonToCsvOperator
E   ModuleNotFoundError: No module named 'airflowbook.operators.json_to_csv_operator'
___________________________________________________________ ERROR collecting tests/airflowbook/operators/test_movielens_operator.py ___________________________________________________________
ImportError while importing test module '/Users/ejstembler/Projects/data-pipelines-with-apache-airflow/chapters/chapter8/tests/airflowbook/operators/test_movielens_operator.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/__init__.py:127: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
tests/airflowbook/operators/test_movielens_operator.py:11: in <module>
    from airflowbook.operators.movielens_operator import (
E   ModuleNotFoundError: No module named 'airflowbook.operators.movielens_operator'
__________________________________________________________ ERROR collecting tests/airflowbook/operators/test_movielens_operator2.py ___________________________________________________________
ImportError while importing test module '/Users/ejstembler/Projects/data-pipelines-with-apache-airflow/chapters/chapter8/tests/airflowbook/operators/test_movielens_operator2.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/__init__.py:127: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
tests/airflowbook/operators/test_movielens_operator2.py:7: in <module>
    from airflowbook.operators.movielens_operator import (
E   ModuleNotFoundError: No module named 'airflowbook.operators.movielens_operator'
________________________________________________________________ ERROR collecting tests/dags/chapter7/custom/test_operators.py ________________________________________________________________
ImportError while importing test module '/Users/ejstembler/Projects/data-pipelines-with-apache-airflow/chapters/chapter8/tests/dags/chapter7/custom/test_operators.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/__init__.py:127: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
tests/dags/chapter7/custom/test_operators.py:4: in <module>
    from airflowbook.operators.movielens_operator import (
E   ModuleNotFoundError: No module named 'airflowbook.operators.movielens_operator'
___________________________________________________________ ERROR collecting tests/dags/chapter7/custom/test_operators_incorrect.py ___________________________________________________________
ImportError while importing test module '/Users/ejstembler/Projects/data-pipelines-with-apache-airflow/chapters/chapter8/tests/dags/chapter7/custom/test_operators_incorrect.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/__init__.py:127: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
tests/dags/chapter7/custom/test_operators_incorrect.py:5: in <module>
    from airflowbook.operators.movielens_operator import MovielensPopularityOperator
E   ModuleNotFoundError: No module named 'airflowbook.operators.movielens_operator'
=================================================================================== short test summary info ===================================================================================
ERROR tests/airflowbook/operators/test_json_to_csv_operator.py
ERROR tests/airflowbook/operators/test_movielens_operator.py
ERROR tests/airflowbook/operators/test_movielens_operator2.py
ERROR tests/dags/chapter7/custom/test_operators.py
ERROR tests/dags/chapter7/custom/test_operators_incorrect.py
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Interrupted: 5 errors during collection !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
====================================================================================== 5 errors in 0.18s ======================================================================================

Is there a way to get pytest to see these?

UPDATE: I guess you need to include the corresponding src directory.

In any case, I have another project I'm modelling after this which has src and test and an operator in a subdirectory. It's getting similiar ModuleNotFoundError errors. It would be great to see a full working example.

License: missing

the code license is missing. Is it yet defined and can the code presented here be re-used, resp. under which conditions?

bug: chapter05/dags/11_xcoms_return.py

The 11_xcoms_return.py should modify to use "return_value" as XComs pull key as indicated in Figure 5.17.

From:

deploy_model = PythonOperator(
        task_id="deploy_model",
        python_callable=_deploy_model,
        templates_dict={
            "model_id": "{{task_instance.xcom_pull(task_ids='train_model', key='model_id')}}"
        },
    )

To:

deploy_model = PythonOperator(
        task_id="deploy_model",
        python_callable=_deploy_model,
        templates_dict={
            "model_id": "{{task_instance.xcom_pull(task_ids='train_model', key='return_value')}}"
        },
    )

Chapter 9 missing codes of movielens_operator

Hi Team,

The concepts of chapter 9 help me a lot and I found codes in the path of airflowbook.operators.movielens_operator are missing. Could you help upload them?

Thank you for your great work.

Chapter 9 Import Error

Hi,

I am trying to use the test_dag_integrity.py code on my own project.

Here is my directory structure:

root/

  • dags/
    • dag1.py
  • tests/
    • test_dag_integrity.py

In dag1.py, it imports several airflow operator like BigQueryOperator, SlackOperator, etc.

In test_dag_integrity.py, I modified the
DAG_PATH = os.path.join(os.path.dirname(__file__), "..", "..", "dags/**.py")
->
DAG_PATH = os.path.join(os.path.dirname(__file__), "..", "dags/**.py")
to meet my directory structure.

Then I run pytest test_dag_integrity.py, it failed and the summary info is like:
FAILED test_daily_pipeline_integration.py::test_dag_integrity[PATH\\TO\\MY\\DIRECTORY\\..\\dags\\ip_address_mapping.py] - ModuleNotFoundError: No module named 'airflow.providers.google'

Is there anything I missed?

Thanks !

[Question] Chapter 3 section 1

when you use "curl -o /data/events.json http://events_api:5000/events" and say "fetch and store the events from the API"
What is that API? I assume this DAG won't run? not sure if I missed something earlier and I can't seem to find that.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.