The executives at Mystique Unicorn are interested in getting near real-time insights into the sales performance of their stores. They are multiple stores in multiple locations. All of these stores send their sales records as a stream of events to a kinesis data stream at regular intervals throughout the day. They would like to have a real-time dashboard that shows them the sales revenue per store. In future they would like to expand this capability to drill down to find out which category is most popular. For example, A typical question that often gets asked is, Which category did we sell more today? Electronics or Books?
They heard that AWS offers analytics capabilities on streaming events of data. Can you help them?
AWS offers multiple capabilities to perform analytics on streaming data. As they are using kinesis data streams to ingest the stream of sales events, We can leverage Kinesis Data Analytics capability with AWS to perform stream analytics using regular SQL
or Apache Flink
. In this solution, we will use simple SQL query to calculate the revenue_per_store
metric and store the value in S3 for further processing.
The incoming data event payload looks something like this.
{
"category": "Electronics",
"store_id": "store_3",
"evnt_time": "2021-01-31T14:05:47.190114",
"sales": 22.78
}
We will use the store_id
to summarize the sales
and calculate the revenue
of that store for a given period of time. Let us assume, we want to calculate the revenue of each store per_minute
. At this point we will have a stream revenue events from kinesis analytics(KDA). We can use either data stream or firehose depending upon the downstream consumers.
In this example, I am going to be using Kinesis firehose to store the events in S3. Before ingesting the revenue records in S3, I am going to use a lambda function to do some basic transformation like adding a new line character between each record coming from KDA. This will help data processing in the future using Redshift/Athena.
The final AWS architecture looks something like,
In this article, we will build an architecture, similar to the one shown above. We will start backwards so that all the dependencies are satisfied.
-
This demo, instructions, scripts and cloudformation template is designed to be run in
us-east-1
. With few modifications you can try it out in other regions as well(Not covered here).- ๐ AWS CLI Installed & Configured - Get help here
- ๐ AWS CDK Installed & Configured - Get help here
- ๐ Python Packages, Change the below commands to suit your OS, the following is written for amzn linux 2
- Python3 -
yum install -y python3
- Python Pip -
yum install -y python-pip
- Virtualenv -
pip3 install virtualenv
- Python3 -
-
-
Get the application code
git clone https://github.com/miztiik/kinesis-tumbling-window-analytics cd kinesis-tumbling-window-analytics
-
-
We will use
cdk
to make our deployments easier. Lets go ahead and install the necessary components.# You should have npm pre-installed # If you DO NOT have cdk installed npm install -g aws-cdk # Make sure you in root directory python3 -m venv .venv source .venv/bin/activate pip3 install -r requirements.txt
The very first time you deploy an AWS CDK app into an environment (account/region), youโll need to install a
bootstrap stack
, Otherwise just go ahead and deploy usingcdk deploy
.cdk bootstrap cdk ls # Follow on screen prompts
You should see an output of the available stacks,
kinesis-tumbling-window-analytics-producer-stack kinesis-tumbling-window-analytics-firehose-stack kinesis-tumbling-window-analytics-consumer-stack
-
Let us walk through each of the stacks,
-
Stack: kinesis-tumbling-window-analytics-producer-stack
This stack will create a kinesis data stream and the producer lambda function. Each lambda runs for a minute ingesting stream of sales events for
5
different stores ranging fromstore_id=1
tostore_id=5
Initiate the deployment with the following command,
cdk deploy kinesis-tumbling-window-analytics-producer-stack
After successfully deploying the stack, Check the
Outputs
section of the stack. You will find thestreamDataProcessor
producer lambda function. We will invoke this function later during our testing phase. -
Stack: kinesis-tumbling-window-analytics-firehose-stack
This stack will create the firehose stack to receive the stream of events from kinesis analytics and also deploy the simple lambda transformer. This firehose is set to buffer for
1
minute or until1
MB of data is collected before sending it to S3Initiate the deployment with the following command,
cdk deploy kinesis-tumbling-window-analytics-firehose-stack
After successfully deploying the stack, Check the
Outputs
section of the stack. You will find theFirehoseDataStore
where the analytics results will be stored eventually. -
Stack: kinesis-tumbling-window-analytics-consumer-stack
This stack will create the kinesis analytics. The SQL application code that does the magic of aggregating sales across stores is baked into the stack. If you would like to take a look and make some improvments
"CREATE OR REPLACE STREAM "DEST_SQL_STREAM_BY_STORE_ID" ("store_id" VARCHAR(16),"revenue" REAL, "timestamp" TIMESTAMP); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DEST_SQL_STREAM_BY_STORE_ID" SELECT STREAM "store_id", SUM("sales") AS "revenue", ROWTIME AS "timestamp" FROM "STORE_REVENUE_PER_MIN_001" GROUP BY STEP("STORE_REVENUE_PER_MIN_001".ROWTIME BY INTERVAL '60' SECOND), "store_id";
Here we are aggregating the sales revenue per store per minute. Initiate the deployment with the following command,
cdk deploy kinesis-tumbling-window-analytics-consumer-stack
-
-
Before we go ahead and start testing the solution, We need to start the Kinesis Analytics Application. Unfortunately(or maybe fortunately for cost reasons) we cannot start it automatically through cloudformation. We will have to do it through a custom resource(which is initself a pain to maintain) or do it manually.
-
Start Kinesis Analytics Application: Here is a screenshot, to help you do that. Navigate to kineis analytics and choose
Run
. -
Invoke Producer Lambda: Let us start by invoking the lambda from the producer stack
kinesis-tumbling-window-analytics-producer-stack
using the AWS Console. If you want to ingest more events, use another browser window and invoke the lambda again.{ "statusCode": 200, "body": "{\"message\": {\"status\": true, \"record_count\": 1170, \"tot_sales\": 59109.60999999987}}" }
Here in this invocation, I have ingested about
1170
events and the total sales volume across all stores[1..5]
is59109
. -
Check FirehoseDataStore:
After about
60
seconds, Navigate to the data store S3 Bucket created by the firehose stackkinesis-tumbling-window-analytics-firehose-stack
. You will be able to find an object key similar to thiskinesis-tumbling-window-analy-fhdatastore6289deb2-5iydp3790az3sales_revenue/2021/01/31/14/revenue_analytics_stream-1-2021-01-31-14-11-00-b2412476-aace-41a7-bbd5-e1a170d2573c
.Kinesis firehose does not have a native mechanism to set the file extension. I was not too keen on setting up another lambda to add the suffix. But the file contents should be one valid
JSON
object per line.
The contents of the file should look like this,
{"store_id": "store_1", "revenue": 8671.3125, "timestamp": "2021-01-31 14:47:00.000"} {"store_id": "store_4", "revenue": 10318.991, "timestamp": "2021-01-31 14:47:00.000"} {"store_id": "store_5", "revenue": 8561.629, "timestamp": "2021-01-31 14:47:00.000"} {"store_id": "store_3", "revenue": 9370.749, "timestamp": "2021-01-31 14:47:00.000"} {"store_id": "store_2", "revenue": 8606.821, "timestamp": "2021-01-31 14:47:00.000"}
You can observe that the revenue per store is aggregated and stored along with
store_id
andtimestamp
attribute. You can feed this into a dashboard like kibana for the executives to provide a real-time snapshot of the sales happening in the stores. -
-
Here we have demonstrated how to use kinesis analytics using simple SQL queries for performing steaming analytics on incoming data. You can extend this further by enriching the item before storing in S3 or partitioning it better for ingesting into data lake platforms.
-
If you want to destroy all the resources created by the stack, Execute the below command to delete the stack, or you can delete the stack from console as well
- Resources created during Deploying The Application
- Delete CloudWatch Lambda LogGroups
- Any other custom resources, you have created for this demo
# Delete from cdk cdk destroy # Follow any on-screen prompts # Delete the CF Stack, If you used cloudformation to deploy the stack. aws cloudformation delete-stack \ --stack-name "MiztiikAutomationStack" \ --region "${AWS_REGION}"
This is not an exhaustive list, please carry out other necessary steps as maybe applicable to your needs.
This repository aims to show how to perform streaming analytics to new developers, Solution Architects & Ops Engineers in AWS. Based on that knowledge these Udemy course #1, course #2 helps you build complete architecture in AWS.
Thank you for your interest in contributing to our project. Whether it is a bug report, new feature, correction, or additional documentation or solutions, we greatly value feedback and contributions from our community. Start here
Buy me a coffee โ.
Level: 300