This repository contains Flink SQL demo queries that can be used on Confluent Cloud for Apache Flink®.
Running these queries requires 4 topics to be set up using the Datagen connector, using AVRO and Schema Registry.
- Topic
clickstream
-> Datagen templateSHOE_CLICKSTREAM
- Topic
customers
-> Datagen templateSHOE_ORDERS
- Topic
orders
-> Datagen templateSHOE_CUSTOMERS
- Topic
shoes
-> Datagen templateSHOES
SELECT * FROM orders;
SELECT window_start, window_end, COUNT(DISTINCT order_id) as nr_of_orders
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR($rowtime), INTERVAL '1' HOUR))
GROUP BY window_start, window_end;
CREATE TABLE sales_per_hour (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
nr_of_orders BIGINT
);
INSERT INTO sales_per_hour
SELECT window_start, window_end, COUNT(DISTINCT order_id) as nr_of_orders
FROM TABLE(
TUMBLE(TABLE `demo`.`webshop_team`.`orders`, DESCRIPTOR($rowtime), INTERVAL '1' HOUR))
GROUP BY window_start, window_end;
SELECT id, name, brand
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY $rowtime DESC) AS row_num
FROM `shoes`)
WHERE row_num = 1
WITH uniqueShoes AS (
SELECT id, name, brand
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY $rowtime DESC) AS row_num
FROM `demo`.`lkc-6wk6y2`.`shoes`)
WHERE row_num = 1
)
SELECT
c.`$rowtime`,
c.product_id,
s.name,
s.brand
FROM
clickstream c
INNER JOIN uniqueShoes s ON c.product_id = s.id;
NOTE: Make sure that you select the correct full qualified names, because they are specific for your setup
SELECT *
FROM clickstream
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY `$rowtime`
MEASURES
FIRST(A.`$rowtime`) AS start_tstamp,
LAST(A.`$rowtime`) AS end_tstamp,
AVG(A.view_time) AS avgViewTime
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A+ B)
DEFINE
A AS AVG(A.view_time) < 30
) MR;