Streaming ETL

Calculating the Difference
Between Two Fields

Organizations often want to take data from an operational system and drive analytics on it, typically through a data warehouse or data lake. KSQL can be used to move this stream of data and its date field from being a high-latency batch process to a low-latency streaming one.

By performing ETL on data as it arrives, the business benefits from more timely insights into their data, as well as from making the enriched and cleansed data available for other event-driven applications.

Here, you can see how to derive data from an inbound stream using KSQL to calculate the difference between two date fields.

 

Directions

In this example, the source data has two columns, RENTAL_DATE and RETURN_DATE, which are epoch timestamps.

1. Convert the epoch timestamp to a human-readable form:

SELECT rental_id, 

       TIMESTAMPTOSTRING(rental_date, 'yyyy-MM-dd HH:mm:ss.SSS'), 
       
TIMESTAMPTOSTRING(return_date, 'yyyy-MM-dd HH:mm:ss.SSS') 
       FROM rental


2. Determine the difference between the two dates:

SELECT rental_id, 
       TIMESTAMPTOSTRING(rental_date, ‘yyyy-MM-dd HH:mm:ss.SSS’), 
       TIMESTAMPTOSTRING(return_date, ‘yyyy-MM-dd HH:mm:ss.SSS’), 
       (CAST(return_date AS DOUBLE) - CAST(rental_date AS DOUBLE) ) 
       FROM rental;

3. Calculate the duration in whole-day values:

SELECT rental_id, 
       TIMESTAMPTOSTRING(rental_date, ‘yyyy-MM-dd HH:mm:ss.SSS’), 
       TIMESTAMPTOSTRING(return_date, ‘yyyy-MM-dd HH:mm:ss.SSS’), 
       CEIL((CAST(return_date AS DOUBLE) - CAST(rental_date AS DOUBLE) ) / 60 / 60 / 24 / 1000) 
       FROM rental;

4. Persist the derivation as a continuous stream populating a target Kafka topic:

CREATE STREAM ENRICHED_DATA WITH (KAFKA_TOPIC=‘ENRICHED_DATA’) AS 
       SELECT rental_id, 
       TIMESTAMPTOSTRING(rental_date, ‘yyyy-MM-dd HH:mm:ss.SSS’), AS rental_date 
       TIMESTAMPTOSTRING(return_date, ‘yyyy-MM-dd HH:mm:ss.SSS’), AS return_date 
       CEIL((CAST(return_date AS DOUBLE) - CAST(rental_date AS DOUBLE) ) / 60 / 60 / 24 / 1000) 
       FROM rental;

The WITH(KAFKA_TOPIC...) syntax is optional, but it is shown here to illustrate that it is possible to customize the target topic (and topic properties, too, if desired).

< Back to the Stream Processing Cookbook

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.