Streaming ETL

Data Routing Joined with a KTable

In this KSQL recipe, we have a major bank with a CRM instance per line of business. Each line of business is able to refer leads to a different line of business by selecting an appropriate “assigned to” username. Referring leads from one line of business to another in order to assign the appropriate specialized employee to contact the lead increases the chances of closing the sale and converting the lead to a client.

For this use case, we will join a source event stream with a routing table to a specific destination topic. This event stream contains a user ID that will be used in the join to select the appropriate destination topic.

Directions

Our two-source system produces users and leads.

Users

{
"userid": "igor.crescent",
"FirstName": "Igor",
"LastName": "Crescent",
"LOB": "Retail"
}

Leads

{
"LastName": "Ness",
"FirstName": "Kristina",
"Salutation": "Ms.",
"Name": "Christine Ness"
"Status": "Open - Not Contacted",
"AssignedTo": "igor.crescent"
}

1. In KSQL, create the users table from the user stream:

ksql> CREATE TABLE users (userid VARCHAR, FirstName VARCHAR, LastName VARCHAR, LOB VARCHAR) WITH (KAFKA_TOPIC='users', VALUE_FORMAT='JSON', KEY='userid');

Message
----------------
Table created
----------------

2. In KSQL, register the source leads stream:

ksql> CREATE STREAM leads (AssignedTo VARCHAR, FirstName VARCHAR, LastName VARCHAR, Salutation VARCHAR, Name VARCHAR, Status VARCHAR) WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='leads');

Message
----------------
Stream created
----------------

3. Create a Kafka topic, populated by the source leads, containing any events that are assigned to a specific user working in a retail line of business:

ksql> CREATE STREAM leads.retail WITH (VALUE_FORMAT='JSON') AS SELECT l.LastName, l.FirstName, l.Salutation, l.Name, l.Status, l.AssignedTo FROM leads l LEFT JOIN users u ON l.AssignedTo = u.userid WHERE u.lob = 'Retail';

Message

----------------------------
Stream created and running
----------------------------

4. Create a Kafka topic, populated by the source leads, containing any events that are assigned to a specific user working in a business banking line of business:

ksql> CREATE STREAM leads.businessbanking WITH (VALUE_FORMAT='JSON') AS SELECT l.LastName, l.FirstName, l.Salutation, l.Name, l.Status, l.AssignedTo FROM leads l LEFT JOIN users u ON l.AssignedTo = u.userid WHERE u.lob = 'Business Banking';

Message

----------------------------
Stream created and running
----------------------------

As a result, three new Kafka topics are being populated by the continuous queries from KSQL that route events arriving on the leads source topic according to the specified line of business. They are assigned to a user working for the given line of business.

< Back to the Stream Processing Cookbook

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.