Streaming ETL

Enriching Streams with Static Data Loaded as a Table

Data that changes infrequently is often called static data, or reference data. It is generally information that is externally published or might be provided by a batch-based system. The type of data could be a customer account, close of business positions, pricing information, etc. It is also very useful when used in conjunction with a stream of information, where the stream usually combines a reference ID or something similar.

For example, a transaction could contain an account number for the recipient and the payee; however, we may want to filter against the organization that the sender’s account belongs to, and the organization is only available from the reference data, not the transaction stream. In this kind of situation, we can use KSQL to join between the stream of events and the reference data.

Directions

1. Register the existing txns topic for use as a KSQL stream called txns:

 CREATE STREAM txns (txn_id BIGINT, userid BIGINT, recipient BIGINT, amount DOUBLE) 
 WITH (KAFKA_TOPIC = 'txns', VALUE_FORMAT = 'json');

2. Inspect the first few messages:

 SELECT * FROM txns LIMIT 5;

3. Register the existing accounts topic for use as a KSQL table called accounts:

 CREATE TABLE accounts (ac_key BIGINT, username VARCHAR, company VARCHAR, created_date VARCHAR) 
 WITH (KEY='ac_key', KAFKA_TOPIC = 'accounts', VALUE_FORMAT = 'json');

4. Inspect the first few messages:

 SET 'auto.offset.reset'='earliest';
 SELECT * FROM accounts LIMIT 5;

5. Join the transactions stream with the account table to create a stream of enriched transactions:

 CREATE STREAM enriched_txns AS 
 SELECT TIMESTAMPTOSTRING(txns.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS TXN_TIMESTAMP, txn_id, userid, username, company, recipient, amount 
   FROM txns 
        INNER JOIN accounts 
        ON txns.userid = accounts.ac_key;

6. Filter the resulting transaction stream for transactions from particular company:

 SELECT * FROM enriched_txns
  WHERE company = 'Nitzsche Group';

You should see the resulting transactions shown with user information, just for those in the specified company:

 2018-12-18 15:12:13 +0000 | 445 | 11 | Farra Stearn | Nitzsche Group | 9 | 84.11
 2018-12-18 15:12:15 +0000 | 448 | 11 | Farra Stearn | Nitzsche Group | 7 | 46.24
 2018-12-18 15:12:16 +0000 | 451 | 7 | Mendel Deyenhardt | Nitzsche Group | 8 | 38.02
 […]
< Back to the Stream Processing Cookbook

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.