Directions
1. Register the existing transactions
topic as a KSQL stream:
CREATE STREAM TRANSACTIONS_RAW (ACCOUNT_ID VARCHAR, TIMESTAMP VARCHAR, CARD_TYPE VARCHAR, AMOUNT DOUBLE, IP_ADDRESS VARCHAR, TRANSACTION_ID VARCHAR) WITH (KAFKA_TOPIC='transactions', VALUE_FORMAT='JSON');
2. Repartition the stream on account_id
, and use Avro for the target stream (this is optional):
CREATE STREAM TRANSACTIONS_SOURCE WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM TRANSACTIONS_RAW PARTITION BY ACCOUNT_ID;
3. Register the existing stream of customer data from Oracle in the topic customers as a KSQL stream:
CREATE STREAM CUST_RAW_STREAM (ID BIGINT, FIRST_NAME VARCHAR, LAST_NAME VARCHAR, EMAIL VARCHAR, AVG_CREDIT_SPEND DOUBLE) WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='JSON');
4. Repartition the customer data stream by account_id
to prepare for the join, and use Avro for the target stream (this is optional):
CREATE STREAM CUSTOMER_REKEYED WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM CUST_RAW_STREAM PARTITION BY ID;
5. Register the partitioned customer data topic as a KSQL table used for the join with the incoming stream of transactions:
CREATE TABLE customer WITH (KAFKA_TOPIC='CUSTOMER_REKEYED', VALUE_FORMAT='AVRO', KEY='ID');
6. Join the transactions to customer information:
CREATE STREAM TRANSACTIONS_ENRICHED AS SELECT T.ACCOUNT_ID, T.CARD_TYPE, T.AMOUNT, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME, C.AVG_CREDIT_SPEND FROM TRANSACTIONS_SOURCE T INNER JOIN CUSTOMER C ON T.ACCOUNT_ID = C.ID;
7. Aggregate the stream of transactions for each account ID using a two-hour tumbling window, and filter for accounts in which the total spend in a two-hour period is greater than the customer’s average:
CREATE TABLE POSSIBLE_STOLEN_CARD AS SELECT TIMESTAMPTOSTRING(WindowStart(), 'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_START, T.ACCOUNT_ID, T.CARD_TYPE, SUM(T.AMOUNT) AS TOTAL_CREDIT_SPEND, T.FULL_NAME, MAX(T.AVG_CREDIT_SPEND) AS AVG_CREDIT_SPEND FROM TRANSACTIONS_ENRICHED T WINDOW TUMBLING (SIZE 2 HOURS) GROUP BY T.ACCOUNT_ID, T.CARD_TYPE, T.FULL_NAME HAVING SUM(T.AMOUNT) > MAX(T.AVG_CREDIT_SPEND) ;
Examine the output:
ksql> SELECT WINDOW_START, ACCOUNT_ID, CARD_TYPE, TOTAL_CREDIT_SPEND, FULL_NAME, AVG_CREDIT_SPEND FROM POSSIBLE_STOLEN_CARD; 2019-01-11 16:00:00 +0000 | 100019 | jcb | 90.69 | Horatius Keefe | 60.58 2019-01-11 16:00:00 +0000 | 100012 | mastercard | 84.04 | Juditha Shwalbe | 53.94 2019-01-11 16:00:00 +0000 | 100016 | maestro | 76.01 | Milo Drewes | 68.33 2019-01-11 16:00:00 +0000 | 100035 | visa-electron | 69.61 | Roxine Furminger | 59.68 …