- This project leverages Python, Kafka, and Spark to process real-time streaming data from both stock markets and Reddit. It employs a Long Short-Term Memory (LSTM) deep learning model to conduct real-time predictions on SPY (S&P 500 ETF) stock data. Additionally, the project utilizes Grafana for the real-time visualization of stock data, predictive analytics, and reddit data, providing a comprehensive and dynamic overview of market trends and sentiments.
- Apache Airflow: Data pipeline orchestration
- Kubernets/Bash/Python operator
- Variables
- Apache Kafka: Stream data handling
- Topic, partition, producer, consumer
- Apache Spark: batch data processing
- Structured streaming
- Apache Cassandra: NoSQL database to store time series data
- Docker + Kubernets: Containerization and Docker Orchestration
- AWS: Amazon Elastic Kubernetes Service(EKS) to run Kubernets on cloud
- Pytorch: Deep learning model
- Grafna: Stream Data visualization
- Why Kafka?
- Kafak serves a stream data handler to feed data into spark and deep learning model
- Design of kafka
- I initialize multiple k8s operators in airflow, where each k8s operator corresponds to single stock, therefore system can simultaneously produce stock data, enhancing the throughput by exploiting parallelism. Consequently, I partition the topic according to the number of stocks, allowing each thread to direct its data into a distinct partition, thereby optimizing the data flow and maximizing efficiency
- Stock data contains the data of
stock
symbol andutc_timestamp
, which can be used to uniquely identify the single data point. Therefore I use those two features as the primary key - Use
utc_timestamp
as the clustering key to store the time series data in ascending order for efficient read(sequantial read for a time series data) and high throughput write(real-time data only appends to the end of parition)
-
Data
- Train Data Dimension (N, T, D)
-
$N$ is number of data in a batch -
$T=200$ look back two hundred seconds data -
$D=5$ the features in the data (price, number of transactions, high price, low price, volumes)
-
- Prediction Data Dimension (1, 200, 5)
- Train Data Dimension (N, T, D)
-
Model Structure:
- X->[LSTM * 5]->Linear->Price-Prediction
-
How the Model works:
- At current timestamp
$t$ , get latest 200 time sereis data before$t$ in ascendingutc_timestamp
order. Feed the data into deep learning model which will predict the current SPY stock prie at time$t$ .
- At current timestamp
-
Due to the limited computational resources on my local machine, the "real-time" prediction lags behind actual time because of the long computation duration required.
- Use Terraform to initialize cloud infrastructure automatically
- Use kubeflow to train deep learning model automatically
- Train a better deep learning model to make prediction more accurate and faster
python3
Docker
is running andK8S
is runningHelm
is installed in local computerDockerhub
- Stock API Key(Not Free)
- Reddit API Key
- AWS acccount if you want to deploy on aws
- At project root directory, run
env.py
to generate the environment file under the project root directory - At terminal, run
aws configure
to configure the aws command line - At the IAM, make sure you have following roles
- eks role
- ec2 role
- At EKS, intialize the cluster
- Initialize the node group with 6
c3.xlarge
nodes with x86-64 architecture - Add the
Amazon EBS CSI add-on
in the cluster by following the AWS-Instruction - At terminal, run
aws eks update-kubeconfig --region <region> --name <cluster_name>
. Make sure the current kubectl context is eks by runningkubectl config get-contexts
on terminal.
make k8s-kafka
make k8s-cassandra
make k8s-spark
make k8s-kafka-dashboard
make k8s-data-dashboard
make k8s-dashboard
For each command line in the following code snippet, run it in different terminal under root directory:
cd k8s/cassandra && make cassandra-cluster-local-connection
cd k8s/kafka-dashboard && make kafka-dashboard-port-forward
cd k8s/spark && make spark-port-forward
cd k8s/grafana && make grafana-port-forward
make k8s-airflow-amd64 # This will take ~10-20mins to intall
- At project root directory, run
env.py
to generate the environment file under the project root directory - Run the following command and wait all commands to finish:
make k8s-kafka
make k8s-cassandra
make k8s-spark
make k8s-kafka-dashboard
make k8s-data-dashboard
make k8s-dashboard
- For each command line in the following code snippet, run it in different terminal under root directory:
cd k8s/cassandra && make cassandra-cluster-local-connection
cd k8s/kafka-dashboard && make kafka-dashboard-port-forward
cd k8s/spark && make spark-port-forward
cd k8s/grafana && make grafana-port-forward
make k8s-airflow-arm64 # Change this to make k8s-airflow-amd64 if you are using x86-64 architecture
- k8s-dashboard for Kubernets cluster monitor
- Airflow (username:admin, password: admin) for real time data generation and consumption pipeline.
- kafka-ui : monitor kafka cluster status.
- spark-ui : monitor spark cluster status.s
- Grafani : Data Visualization
- username: admin
- password: admin123
- At Grafani, add data connection with provider
Apache-Cassandra
(Make sure data pipeline has started)- host:
cassandra.cassandra-cluster.svc.cluster.local:9042
- keyspace:
stock
- user: cassandra, password:cassandra
- host:
- Import the dashboard by import dashboard.json. Remember to update data resource at each panel and change the time range
- At the project root directory, run
make clean
.
├── Makefile
├── README.md
├── README_Resources
│ ├── demo.gif
│ └── project-architecture.png
├── docker-service
│ ├── airflow
│ │ ├── Dockerfile
│ │ ├── dags
│ │ │ ├── real_time_stock_consume.py
│ │ │ └── real_time_stock_data_generation.py
│ │ ├── python_requirements.txt
│ │ └── scripts
│ │ ├── cassandra_table_creation.py
│ │ ├── kafka_topic_creation.py
│ │ └── spark
│ │ └── spark_stream_processing.py
│ ├── python_reddit
│ │ ├── Dockerfile
│ │ ├── python_requirements.txt
│ │ └── reddit-producer.py
│ ├── python_stock
│ │ ├── Dockerfile
│ │ ├── python_requirements.txt
│ │ └── stock_data_generation
│ │ ├── single_stock_generator.py
│ │ ├── stock_generation_script.py
│ │ └── stock_generator.py
│ └── python_stock_prediction
│ ├── Dockerfile
│ ├── lstm_model
│ │ ├── lstm_model.py
│ │ ├── lstm_model_weights.pth
│ │ ├── lstm_prediction.py
│ │ └── scaler.save
│ └── python_requirements.txt
├── env.py
└── k8s
├── airflow
│ ├── Makefile
│ └── values.yaml
├── cassandra
│ ├── Makefile
│ └── values.yaml
├── grafana
│ ├── Makefile
│ ├── dashboard.json
│ └── values.yaml
├── k8s-dashboard
│ ├── Makefile
│ └── dashboard-adminuser.yaml
├── kafka
│ ├── makefile
│ └── values.yaml
├── kafka-dashboard
│ ├── makefile
│ └── values.yml
└── spark
├── Makefile
└── values.yaml