This project is demonstrate ETL Batch Processing using
HDFS as a data lake
,pyspark as a data processing
andairflow as an orchestrator
. The purpose of this project is to create data pipeline from raw data to data lake (HDFS), transform and processing it into dim table and fact table using pyspark and store it into data warehouse (postgreSQL). All the process will be scheduled to do everyday usingAirflow
as scheduler and monitoring process. For the dataset i use 'covid-19 data from Jawa Barat' in json format.
- Prepare the Dataset
- Design ERD
- Extract Dataset to MySQL
- Transform Raw Data
- Load to PostgreSQL
- Create DAG
- Result
This raw data i got from digital skola tutor
I create a file called ingest.py
that has job to load the dataset from local into HDFS using write
method from pyspark
.
...
data.write.mode("overwrite").option("header",True).csv("hdfs:///covid19/raw_data_airflow2")
...
I transform the raw covid19 Jawa Barat dataset into 3 dimension tables [dim_province,dim_district,dim_case]
and 6 fact tables [fact_province_daily, fact_province_monthly, fact_province_yearly, fact_district_daily, fact_district_monthly, fact_district_yearly]
.
To load data i use postgeSQL JDBC Driver
that allows our programs to connect to a PostgreSQL database using standard, database independent Java code. The way i used it is decalre it at the config when i create spark session
.
spark=SparkSession.builder.appName("Submitted2").config("spark.jars", "file:///home/hadoop/postgresql-42.2.6.jar").getOrCreate()
Last for doing this task every day automaticaly, i use airflow as an orchestrator. To do that i need to create DAG
(Directed Acyclic Graph). in this file i scheduled the task to repeat daily using cron_preset on airflow that present as @daily
and using airflow BashOperator
for running the python using bash command. the DAG graph will be look like this:
After the DAG running successfully the data lake (HDFS) and the data warehouse (PostgreSQL) will be look like this: