This application will calculate the daily product revenue that displays date in ascending order and revenue in decending order in Spark & MySQL. It also demonstrates how to reduce Stages & Task in Spark using broadcast variables.
SELECT o.order_date, sum(oi.order_item_subtotal), p.product_name
FROM
(
retail_db.order_items oi JOIN retail_db.orders o ON oi.order_item_order_id = o.order_id
JOIN
retail_db.products p ON p.product_id = oi.order_item_product_id
)
GROUP BY o.order_date, p.product_name
ORDER BY o.order_date, sum(oi.order_item_subtotal) DESC;
Spark - DailyProductRevenue Scala Application Link:
- Stage 0: Reading orders data from HDFS and convering into (K,V) --> orderMap --> (orderID, orderDate)
- Stage 1: Reading order_items data from HDFS and convering into (K,V) --> orderItemMap -> (orderID, (productID, order_itemSubTotal))
- Stage 2: Joining orderMap & orderItemMap --> ordersJoin(K,V) --> (orderID, (orderDate ,(productID, order_itemSubTotal))) and convering into (K,V) --> orderJoinMap(K,V) --> ((orderDate, productID), order_itemSubTotal)
- Stage 3: grouping orderJoinMap(K,V) and aggregating the oroduct revenue --> dailyRevenuePerProductID(K,V) --> ((orderDate, productID), sum(order_itemSubTotal)) and converting to (K,V) --> dailyRevenuePerProductIDMap(K,V) --> (productID, (orderDate, sum(order_itemSubTotal)))
- Stage 4: Reading product data from Local File System and converting to (K,V) productRDDMap(K,V) --> (productID, productName)
- Stage 5: Joining dailyRevenuePerProductIDMap(K,V) & productRDDMap(K,V) --> dailyRevenuePerProductNameLocal(K,V) --> ((orderDate, productName), sum(order_itemSubTotal))
- Stage 0: Reading orders data from HDFS and convering into (K,V) --> orderMap --> (orderID, orderDate)
- Stage 1: Reading order_items data from HDFS and convering into (K,V) --> orderItemMap -> (orderID, (productID, order_itemSubTotal))
- Stage 2: Joining orderMap & orderItemMap --> ordersJoin(K,V) --> (orderID, (orderDate ,(productID, order_itemSubTotal))) and convering into (K,V) --> orderJoinMap(K,V) --> ((orderDate, productID), order_itemSubTotal)
- Stage 3: grouping orderJoinMap(K,V) and aggregating the oroduct revenue --> dailyRevenuePerProductID(K,V) --> ((orderDate, productID), sum(order_itemSubTotal)) and converting to (K,V) using broadcast-variable(product) --> dailyRevenuePerProductName(K,V) --> ((orderDate, sum(order_itemSubTotal)) , productName)