Streaming ETL

Data Routing

KSQL queries run continuously. The output of a KSQL query can be persisted to a Kafka topic using the CREATE STREAM AS syntax. And thus, using these two features, it is possible to write simple yet powerful stream transformations using KSQL alone, which take a real-time feed of events from one Kafka topic, transform them and write them continually to another.

In this recipe, we’ll see how to route messages from a source topic to multiple destination topics based on conditions in the data. The scenario here is that Kafka receives log messages from numerous hosts and applications. We want to route the messages to three target topics based on the following conditions:

  1. Any message that is an ERRROR
  2. DEBUG messages originating from any com.microsoft app
  3. All other messages

Directions

The source event stream is called log_events.

{
  "host": "147.40.121.104",
  "app": "com.salon.Tempsoft",
  "severity": "ERROR",
  "message": "Erigeron serpentinus G.L. Nesom"
}

1. In KSQL, register the source log_events stream:

ksql> CREATE STREAM log_events 
      (host VARCHAR, app VARCHAR, severity VARCHAR, message VARCHAR) 
       WITH (KAFKA_TOPIC='log_events', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

2. Create a Kafka topic, populated by the source log_events, containing any messages that are ERROR:

ksql> CREATE STREAM LOG_ERRORS AS 
      SELECT * 
      FROM log_events 
      WHERE severity='ERROR';

 Message
----------------------------
 Stream created and running
----------------------------

3. Create a Kafka topic, populated by the source log_events, containing any messages that are DEBUG and from the com.microsoft app family:

ksql> CREATE STREAM LOG_MS_DEBUG AS 
      SELECT * 
      FROM log_events 
      WHERE severity='DEBUG' 
        AND app LIKE 'com.microsoft%';

 Message
----------------------------
 Stream created and running
----------------------------

4. Create a Kafka topic, populated by the source log_events, containing any messages that don’t match the above conditions:

ksql> CREATE STREAM LOG_ERRORS_OTHER AS 
      SELECT * 
      FROM log_events 
      WHERE   (severity!='ERROR') 
      AND NOT (severity='DEBUG' AND app LIKE 'com.microsoft%');

 Message
----------------------------
 Stream created and running
----------------------------

As a result of the above, three new Kafka topics are being populated by the continuous queries from KSQL above that route messages arriving on the source topic according to the criteria specified.

< Back to the Stream Processing Cookbook

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.