Data Wrangling

Data Segmentation

KSQL allows you to combine streams together in a tabular format called a KTable, which is essentially a materialized view. In this recipe, you’ll find instructions for creating a few simple streams and combining them into a unified view. This is useful when you want to segment or wrangle data together from separate topics.

Directions

In this recipe, we imagine three topics to be created that have the following type of data: customers, transactions and customer service interactions. You can find some sample data examples, in JSON, below:

Customers
{
"customerid": "foti.filacouris",
"firstName": "Foti",
"lastName": "Filacouris",
"age": "34"
"sex": "Male"
}
Transactions
{
"customerid": "foti.filacouris",
"transactionid": "Za12492b",
"transtype": "credit",
"transdate": "10-21-2018"
"transtime": "T18:25:43.511Z"
}
Customer Service Interactions
{
"customerid": "foti.filacouris",
"servicerequest": "4",
"servicedate": "10-21-2018",
"severity": "HIGH"
"resolution": "Block removed from card"
}

1. In KSQL, create a customers table from the users stream. This will be important when we join our intermediary streams together, allowing us to get a full look at our customer segmentation.

ksql> CREATE TABLE customers (customerid VARCHAR, firstName VARCHAR, lastName VARCHAR, age VARCHAR, sex VARCHAR) WITH (KAFKA_TOPIC= 'users', VALUE_FORMAT='JSON', KEY=’customerid’);

Message        
----------------
Table created

2. In KSQL, register the transactions stream from the transactions topic:

ksql> CREATE STREAM transactions (customerid VARCHAR, transactionid VARCHAR, transtype VARCHAR, transdate VARCHAR, transtime VARCHAR) WITH (KAFKA_TOPIC= 'transactions', VALUE_FORMAT='JSON');

Message        
----------------
Stream created 
----------------

3. Likewise, register the customer service stream from the customer-service-interactions topic:

ksql> CREATE STREAM customer_service (customerid VARCHAR, servicerequest VARCHAR, servicedate VARCHAR, severity VARCHAR, resolution VARCHAR) WITH (KAFKA_TOPIC= 'customer-service-interactions, VALUE_FORMAT='JSON');

Message        
----------------
Stream created 
----------------

4. Next, create a new stream that will join the transaction stream and the customer service stream together, based on our customerid field. Note that stream-stream joins require a WITHIN clause. In this example, we are joining transaction and service requests for the past month. It is also important to note that setting partitions equal to the number of partitions created in the table above is necessary to enable a proper join between the stream and table.

ksql> CREATE STREAM transactions_enriched WITH (PARTITIONS=1) AS
>  SELECT t.customerid AS customerid,
>         t.transactionid,
>         t.transtype,
>         t.transdate,
>         t.transtime,
>         cs.servicerequest,
>         cs.servicedate,
>         cs.severity
>  FROM TRANSACTIONS t\
>  LEFT JOIN CUSTOMER_SERVICE cs\
>  WITHIN 30 DAYS\
>  ON t.customerid = cs.customerid;


Message        
----------------------------
 Stream created and running 
----------------------------

5. Now you can join the customers table created earlier to your new stream to provide the most up-to-date correlation between your customers, their transactions and customer service interactions. In this particular example, we can glean that our customer’s credit card transaction was directly followed by a service request. This will allow for a real-time look into your customer interactions.

ksql> CREATE STREAM customers_enriched AS
>  SELECT tx.customerid AS customerid,
>         tx.transactionid,
>         tx.transtype,
>         tx.transdate,
>         tx.transtime,
>         tx.servicerequest,
>         tx.servicedate,
>         tx.severity,
>         c.firstName,
>         c.lastName,
>         c.age
>  FROM TRANSACTIONS_ENRICHED tx\
>  LEFT JOIN CUSTOMERS c\
>  ON tx.customerid = c.customerid;


 Message                    
----------------------------
 Stream created and running 
----------------------------



ksql> describe customers_enriched;

Name                 : CUSTOMERS_ENRICHED
 Field          | Type                      
--------------------------------------------
 ROWTIME        | BIGINT           (system) 
 ROWKEY         | VARCHAR(STRING)  (system) 
 CUSTOMERID     | VARCHAR(STRING)           
 TRANSACTIONID  | VARCHAR(STRING)           
 TRANSTYPE      | VARCHAR(STRING)           
 TRANSDATE      | VARCHAR(STRING)           
 TRANSTIME      | VARCHAR(STRING)           
 SERVICEREQUEST | VARCHAR(STRING)           
 SERVICEDATE    | VARCHAR(STRING)           
 SEVERITY       | VARCHAR(STRING)           
 FIRSTNAME      | VARCHAR(STRING)           
 LASTNAME       | VARCHAR(STRING)           
 AGE            | VARCHAR(STRING) 
 SEX            | VARCHAR(STRING)           
--------------------------------------------

This is a basic example of how you can join data together from different applications within the enterprise. This allows you to properly segment data and gain greater insights across silos.

< Back to the Stream Processing Cookbook

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.