Event Time Processing

Event Time Processing

KSQL supports time-based aggregations, enabling it to answer questions such as:

  • How many events occurred in the last five minutes?
  • What was the total value of orders placed each hour today?
  • How many minutes have elapsed between customer actions?

Kafka messages are made up of three parts: a key, a value and a timestamp. By default, KSQL will use the message’s timestamp for any time-dependent processing, such as windowed aggregations.

Event time processing is a powerful concept in stream processing, as it enables developers to reason about their data based on the business event rather than the arbitrary time at which it is processed. KSQL supports event time processing natively.

The internal timestamp of a Kafka message is by default that which the producing application provides (usually event time). If it is not provided—or the broker configuration is such—then messages will instead have the timestamp of the time at which the message arrived at the broker (typically called system time or processing time).

We’ll see below how KSQL can use either the internal timestamp of a message or an arbitrary field from the message’s payload when performing time-based aggregations.

Directions

This example uses a topic where the messages have two timestamps:

  • The internal message timestamp
  • An application-generated timestamp column named loadDate

The message payload looks like this:

{
  "loadDate": 1525221132954,
  "code": "Foo42",
  "user": "Rick"
}

1. By default, KSQL will use the timestamp of the Kafka message itself. You can inspect the timestamp value that KSQL is using through the ROWTIME system column:

ksql> SELECT ROWTIME, loadDate, code, user from event_data;
1525339102591 | 1525221132954 | Foo42 | Rick

2. Timestamps are stored as epoch values. To make it human-readable, use the provided TIMESTAMPTOSTRING function:

ksql> SELECT ROWTIME, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), 
             loadDate, TIMESTAMPTOSTRING(loadDate, 'yyyy-MM-dd HH:mm:ss'), 
             code, user 
      FROM event_data;
1525339102591 | 2018-05-03 10:18:22 | 1525221132954 | 2018-05-02 01:32:12 | Foo42 | Rick

3. In the above message, the Kafka message’s timestamp is 2018-05-03 10:18:22 , and the loadDate field in the message payload is 2018-05-02 01:32:12.

If you perform a time-based aggregation on the data, you’ll see that Kafka puts the above event in the time window associated with the message’s timestamp (2018-05-03 10:18:22):

ksql> CREATE TABLE events_per_hour AS 
      SELECT user, code, COUNT(*) AS event_count 
      FROM event_data WINDOW TUMBLING (SIZE 1 HOUR) 
      GROUP BY user, code;

 Message
---------------------------
 Table created and running
---------------------------
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), user, code, event_count 
      FROM events_per_hour;
2018-05-03 10:00:00 | Rick | Foo42 | 1

This result could be good enough, but for this example, you want to view the data in a time dimension provided within the message itself.

4. In this example, the data is processed using the loadDate field from the Kafka message. To accomplish this, you pass loadDate as the TIMESTAMP parameter when you register the KSQL stream (this is also valid for KSQL tables):

ksql> CREATE STREAM event_data_by_loadDate (loadDate LONG, code VARCHAR, user VARCHAR) 
      WITH (KAFKA_TOPIC='event_data', 
            VALUE_FORMAT='JSON', 
            TIMESTAMP='loadDate');

 Message
----------------
 Stream created
----------------

5. Examine the ROWTIME field, and you’ll see that it matches loadDate:

ksql> SELECT ROWTIME, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), 
      loadDate, TIMESTAMPTOSTRING(loadDate, 'yyyy-MM-dd HH:mm:ss'), 
      code, user 
      FROM event_data_by_loadDate;
1525221132954 | 2018-05-02 01:32:12 | 1525221132954 | 2018-05-02 01:32:12 | Foo42 | Rick

6. You can also validate whether any columns are being used to override the timestamp of messages with DESCRIBE EXTENDED:

ksql> DESCRIBE EXTENDED event_data_by_loadDate;

Type                 : STREAM
Key field            :
Timestamp field      : LOADDATE
[...]

7. Going back to the time-based aggregation that was shown above, you can see that the event is assigned to the time window appropriate to the loadDate (which is 2018-05-02 01:32:12):

ksql> CREATE TABLE events_per_hour_by_loadDate AS 
      SELECT user, code, COUNT(*) AS event_count 
      FROM event_data_by_loadDate WINDOW TUMBLING (SIZE 1 HOUR) 
      GROUP BY user, code;

 Message
---------------------------
 Table created and running
---------------------------
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), user, code, event_count 
      FROM events_per_hour_by_loadDate;
2018-05-02 01:00:00 | Rick | Foo42 | 1

Remember, this is based on data from the same source Kafka topic, just handling the timestamp in different ways. The first uses the timestamp provided within the Kafka message itself; the second uses an arbitrary field from within the message’s payload.

< Back to the Stream Processing Cookbook

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.