Streaming ETL

Lookups

Apache Kafka® is the perfect technology for handling streams of events. These could be from IoT devices, applications or even mainframes. Oftentimes, events themselves require additional context in order to be valuable particularly for analytics. This enrichment process can be done in flight on the stream of data using KSQL.

The source of enrichment can be data provided by an application directly into a Kafka topic, or Kafka Connect data from almost any source can be used for lookups. Common examples would be bringing in information about customers from a relational database such as Oracle or PostgreSQL.

In this example, we will enrich a stream of events from user ratings on a website application. This event stream contains the user ID but no additional information. To derive real-time value from this stream, you can enrich it with user information, including name and location. This user information comes in real time from MySQL, streamed into Kafka using Kafka Connect.

Directions

In this example, the source event stream is named ratings and has several fields including user_id:

{
  "rating_id": {
    "long": 71
  },
  "user_id": {
    "int": 4
  },
  "stars": {
    "int": 2
  },
  "message": {
    "string": "your team here rocks!"
  }
}

This reference data comes from MySQL and has information about each user and an id column:

{
  "id": 7,
  "first_name": {
    "string": "Marigold"
  },
  "last_name": {
    "string": "Veld"
  },
  "email": {
    "string": "mveld6@pinterest.com"
  }
}

An important note about the reference data is that the message key must be the key on which the join is to be done. Because you are joining on the id column, the message key must be the same as the id column. If the source topic is not already in this state, then see how to repartition a topic.

1. In KSQL, register the RATINGS stream:

ksql> CREATE STREAM RATINGS WITH (KAFKA_TOPIC='ratings',VALUE_FORMAT='AVRO');

 Message
----------------
 Stream created
----------------

2. Because you are using the user information for lookups, you want to have the latest value for each key and model it as a KSQL table. To model it as a stream would instead give you every value for a given key, which is not what you want.

Register the CUSTOMERS table:

ksql> CREATE TABLE CUSTOMERS WITH (KAFKA_TOPIC='mysql_customers', VALUE_FORMAT ='AVRO', KEY='ID');

 Message
---------------
 Table created
---------------

3. To ensure the success of the join, you need to confirm that the key of the customers table matches the column on which you’ve declared it. The key of a message can be accessed in KSQL using the system column ROWKEY. The column on which you’re going to join is ID. Each row should be identical when you run this query:

ksql> SELECT ROWKEY, ID FROM CUSTOMERS;
4 | 4
1 | 1
3 | 3

If the ROWKEY doesn’t match the column on which you are going to join, then you can use KSQL to repartition the source topic (see the how to repartition a topic).

4. Create a Kafka topic based on the source events (ratings) with lookups to the reference information (customer):

CREATE STREAM RATINGS_ENRICHED WITH (PARTITIONS=1) AS 
SELECT R.RATING_ID, R.STARS, R.MESSAGE, 
C.ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME 
FROM RATINGS R 
LEFT JOIN CUSTOMERS C ON R.USER_ID = C.ID;

5. Query the derived stream in KSQL, showing the source event plus customer information:

ksql> SELECT * FROM RATINGS_ENRICHED LIMIT 1;
1525862956160 | 4 | 22 | 3 | your team here rocks! | 4 | Nolana Yeeles
LIMIT reached for the partition.
Query terminated

6. Since KSQL is populating a Kafka topic with the results of the continuous query, you can also inspect the Kafka topic itself to see the enriched stream:

$ kafka-avro-console-consumer 
  --bootstrap-server localhost:9092 
  --property schema.registry.url=http://localhost:8081 
  --topic RATINGS_ENRICHED | jq '.'
{
  "RATING_ID": {
    "long": 6999
  },
  "STARS": {
    "int": 1
  },
  "MESSAGE": {
    "string": "airport refurb looks great, will fly outta here more!"
  },
  "ID": {
    "int": 4
  },
  "FULL_NAME": {
    "string": "Nolana Yeeles"
  }
}

7. To see information about the KSQL query, including the target Kafka topic and number of messages processed, use DESCRIBE EXTENDED:

ksql> DESCRIBE EXTENDED RATINGS_ENRICHED;

Type                 : STREAM
Key field            : R.USER_ID
Timestamp field      : Not set - using 
Key format           : STRING
Value format         : AVRO
Kafka output topic   : RATINGS_ENRICHED (partitions: 1, replication: 1)

 Field     | Type
---------------------------------------
 ROWTIME   | BIGINT           (system)
 ROWKEY    | VARCHAR(STRING)  (system)
 RATING_ID | BIGINT
 STARS     | INTEGER
 MESSAGE   | VARCHAR(STRING)
 ID        | INTEGER
 FULL_NAME | VARCHAR(STRING)
---------------------------------------

Queries that write into this STREAM
-----------------------------------
id:CSAS_RATINGS_ENRICHED - CREATE STREAM RATINGS_ENRICHED WITH (PARTITIONS=1) AS SELECT R.RATING_ID, R.STARS, R.MESSAGE, C.ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME FROM RATINGS R LEFT JOIN CUSTOMERS C ON R.USER_ID = C.ID;

For query topology and execution plan, run: EXPLAIN 

Local runtime statistics
------------------------
messages-per-sec:      3.96   total-messages:      7193     last-message: 09/05/18 12:19:33 BST
 failed-messages:         0 failed-messages-per-sec:         0      last-failed:       n/a
(Statistics of the local KSQL server interaction with the Kafka topic RATINGS_ENRICHED)
< Back to the Stream Processing Cookbook

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.