Data Wrangling

Flattening JSON Kafka Messages

KSQL can be used to flatten the schema of data in a Kafka message. This can be useful when a downstream system requires the schema to be flat, and not nested. For example you may have data on a Kafka topic that looks like this:

{
  "user": {
    "first_name": "Lars",
    "last_name": "Treagus",
    "email": "ltreagus0@timesonline.co.uk"
  },
  "ip_address": "242.115.235.56",
  "logon_date": "2018-02-05T19:45:59Z"
}

You can use KSQL to process every message as it arrives on the source topic and write it to a new Kafka topic with the nesting removed so that that the message looks like this:

{
  "user_first_name": "Lars",
  "user_last_name": "Treagus",
  "user_email": "ltreagus0@timesonline.co.uk"
  "ip_address": "242.115.235.56",
  "logon_date": "2018-02-05T19:45:59Z"
}

You can also write the data as CSV, instead of JSON.

Directions

1. Register the existing user_logons topic for use as a KSQL stream called user_logons. Note the STRUCT data type for the nested field:

CREATE STREAM user_logons 
        (user      STRUCT<  
                          first_name VARCHAR, 
                          last_name  VARCHAR,  
                          email      VARCHAR 
                         >, 
        ip_address VARCHAR, 
        logon_date VARCHAR) 
        WITH (KAFKA_TOPIC ='user_logons', 
              VALUE_FORMAT='JSON');

2. Optionally, inspect the first few messages as they arrive:

SELECT * FROM user_logons LIMIT 5;

3. Write the flattened structure as a new Kafka topic, updated continually from new messages arriving on the source topic. Note the use of the operator to access the nested columns.

CREATE STREAM user_logons_all_cols                  
        WITH (KAFKA_TOPIC='user_logons_flat') AS    
        SELECT user->first_name AS USER_FIRST_NAME, 
               user->last_name  AS USER_LAST_NAME,  
               user->email      AS USER_EMAIL,      
                                   ip_address,      
                                   logon_date       
          FROM user_logons;

Note how the target Kafka topic is explicitly set. Without KAFKA_TOPIC specified, the name of the stream will be used.

The new stream populates a Kafka topic. You can see this from LIST TOPICS:

ksql> LIST TOPICS;

 Kafka Topic        | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
------------------------------------------------------------------------------------------------
 user_logons        | true       | 1          | 1                  | 1         | 1
 user_logons_flat   | true       | 4          | 1                  | 0         | 0

The contents of the topic can be viewed by any Kafka client, or simply with PRINT from KSQL:

ksql> PRINT 'user_logons_flat';
Format:JSON
{"ROWTIME":1547205974896,"ROWKEY":"null","USER_FIRST_NAME":"Hetti","USER_LAST_NAME":"Debrett","USER_EMAIL":"hdebretthp@ask.com","IP_ADDRESS":"115.102.56.33","LOGON_DATE":"2017-11-17T06:26:31Z"}

Press Ctrl-C to exit the PRINT command.

If you want to write the flattened data to CSV output instead then add VALUE_FORMAT='DELIMITED' to the WITH clause:

CREATE STREAM user_logons_all_cols_csv                  
        WITH (KAFKA_TOPIC='user_logons_flat_csv', VALUE_FORMAT='DELIMITED') AS    
        SELECT user->first_name AS USER_FIRST_NAME, 
               user->last_name  AS USER_LAST_NAME,  
               user->email      AS USER_EMAIL,      
                                   ip_address,      
                                   logon_date       
          FROM user_logons;
ksql> PRINT 'user_logons_flat_csv' FROM BEGINNING LIMIT 5;
Format:STRING
4/9/19 3:40:37 PM UTC , NULL , Lian,McTrusty,lmctrusty1@thetimes.co.uk,63.110.21.234,2018-02-20T20:04:12Z
4/9/19 3:40:37 PM UTC , NULL , Carrissa,Halston,chalston5@sitemeter.com,220.61.74.180,2018-03-22T15:20:04Z
4/9/19 3:40:37 PM UTC , NULL , Korry,Really,kreally9@sakura.ne.jp,214.129.206.164,2018-03-27T02:45:42Z
4/9/19 3:40:37 PM UTC , NULL , Toma,Eisikovitsh,teisikovitshd@last.fm,105.163.137.4,2017-12-27T12:51:06Z
4/9/19 3:40:37 PM UTC , NULL , Izabel,Lenney,ilenneyh@addthis.com,189.48.198.142,2017-08-14T17:30:07Z
< Back to the Stream Processing Cookbook

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.