Data Wrangling

Using Event Time from IoT Sensor Readings

Sensor data typically includes a timestamp of when the value was read. You can use KSQL to format a stream that uses this data timestamp as the message timestamp. This is as an alternative to the default ROWTIME value, where Apache Kafka® normally writes its internal timestamp, set by the producing application or just the time that the message was written to the brokers. Using the timestamp from the sensor reading makes windowing and aggregate operations more intuitive because those operations will be based on when the data was sampled.

Directions

1. Below is a sample of the raw sensor data that is being produced to a Kafka topic, where the first element of the delimited data is the epoch timestamp of the sensor reading.

1546033893602,0.013611,0.001587,-0.003052,0,-1,-1,21
1546033893735,-0.004700,-0.001587,-0.010132,0,0,0,21
1546033893864,-0.003723,-0.001099,-0.001587,0,0,0,21
1546033893995,-0.001526,-0.000610,0.002075,0,0,-1,21
1546033894122,-0.001526,-0.003296,0.006470,0,0,0,21
1546033894255,0.000427,-0.005981,-0.016479,-1,0,0,21

2. Create a stream with the sensor time as the TIMESTAMP:

CREATE STREAM sensor_stream ( 
    ts BIGINT, 
    accel_x DOUBLE, 
    accel_y DOUBLE, 
    accel_z DOUBLE,  
    gyro_x INTEGER, 
    gyro_y INTEGER, 
    gyro_z INTEGER, 
    temp_c INTEGER 
    ) WITH (KAFKA_TOPIC='sensor_delimited', VALUE_FORMAT='DELIMITED', TIMESTAMP='ts');

Notice the timestamp (ts) is already being specified as BIGINT and the TIMESTAMP=ts in the WITH clause.

From here, we can run a quick validation query in the Confluent Control Center UI and see that the ROWTIME and TS are in fact the same values:Confluent Control Center UI

3. We now have raw data and a stream we can query based on the sensor reading timestamp, so let’s use the other side of the stream and table duality to look at the data further. Here, the sensor includes motion data, and we can analyze the maximum acceleration rate of the x-axis over the past hour.

ksql> CREATE TABLE sensor_table_accel_x_hourly AS 
   SELECT ts,
          TIMESTAMPTOSTRING(windowstart(), 'yyyy-MM-dd HH:mm:ss') AS window_start_ts,
          TIMESTAMPTOSTRING(windowend(), 'yyyy-MM-dd HH:mm:ss') AS window_end_ts,
          max(accel_x)  AS max_x_accel
     FROM sensor_stream 
          WINDOW SESSION (60 MINUTES) 
 GROUP BY accel_x;

From there, we can see what the maximum acceleration in g-forces was in the past hour:

ksql> SELECT window_start_ts, window_end_ts, max_x_accel 
FROM SENSOR_TABLE_ACCEL_X_HOURLY 
LIMIT 1;
2018-12-28  22:37:34 | 2018-12-28  22:41:06 | -0.004395
< Back to the Stream Processing Cookbook

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.