Join us at Current New Orleans! Save $500 with early bird pricing until August 15 | Register Now

Building Streaming Data Pipelines, Part 2: Data Processing and Enrichment With SQL

Écrit par

In my last blog post, I looked at the essential first part of building any data pipeline—exploring the raw source data to understand its characteristics and relationships. The data is information about river levels, rainfall, and other weather information provided by the UK Environment Agency on a REST API. I used the HTTP Source connector to stream this into Apache Kafka® topics (one per REST endpoint), and then Tableflow to expose these as Apache Iceberg™ tables.

With the data in Iceberg, I could then use standard SQL and visualization tools to poke around the data and validate key relationships, as well as some prototype charting. 

In this blog post, I’m going to show how you can build out a streaming ETL pipeline on Confluent Cloud using SQL to transform and enrich the data, before writing it as Iceberg tables using Tableflow.

A Modular Approach to Building Streaming ETL

The pipeline at a high level will look like this:

  1. Extract (already done, using Kafka Connect and the HTTP Source connector)

  2. Transform (two steps)

    1. Unpack the source data array into individual rows, apply light transformations to restructure the data.

    2. Enrich the reading data with information about the related measure and station

  3. Load The enriched data is then ready for exposing as an Iceberg table using Tableflow.

    • Technically, the final 'Transform' step is also a 'Load', since the output of it is written to a Kafka topic from which it can be used directly by other applications if they want.

I’m following a modular approach to building a pipeline, and creating sets of tables along the way. This makes development, maintenance, and troubleshooting easier. It also means that other applications can use the data without being prescribed to using my particular data model in the finished product.

For our SQL processing, we’ll use Confluent Cloud for Apache Flink®. You provide the SQL, we run it :) As well as running your production SQL pipelines, Confluent Cloud for Apache Flink® provides a notebook-like interface for developing and prototyping your SQL, and it’s this that we’ll use here.

Confluent Cloud runs Flink SQL in what are called compute pools, so we’ll go ahead and create one for this project:

From here I’ve got two options to start writing SQL:

  1. The Flink shell, provided as part of the Confluent CLI

  2. A SQL workspace, which is browser-based

Let’s launch the SQL Workspace:

You’ll notice that on the left-hand side, my Confluent Cloud cluster dev is shown with the three source topics automagically mapped as Flink tables.

Transform (Step 1): Unpacking and Cleaning the Source Data

As we saw in the previous blog post, the data is made up of three sources:

  • Readings

  • Measures

  • Stations

The relationship between these three entities is as shown here:

In its raw state, the data in each entity follows the same model, and it looks like this:

As we saw when we initially analyzed the data, the items field is a nested array of entries for the entity, batched up into a single message.

Our first job is therefore to unpack this array so that each instance of the entity (reading, measure, station) becomes its own row. In Flink SQL, this is done using the CROSS JOIN UNNEST. Let’s try it out on a few rows:

SELECT  i.dateTime,
        i.measure,
        i.`value`
FROM `flood-monitoring-readings` r
    CROSS JOIN UNNEST(r.items) AS i
LIMIT 5;

That works nicely for unpacking the array.

To materialize the unpacked data into a new table, we’ll run a continuous Flink SQL query that writes its output to the target table. There are two ways to do this:

  1. First, CREATE the target table, and then set the INSERT query running.

  2. Combine these into what’s known as CTAS—an acronym of the SQL statement that we use: CREATE TABLE … AS SELECT …. The output of the SELECT is written to the table specified in the CREATE.

In my case, I’m using the first approach because I want to add a headers column—data written into which is stored in the resulting Kafka message header—and this can’t be done as a CTAS.

CREATE TABLE readings (
    `dateTime` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
    `measure` VARCHAR,
    `value` DOUBLE NOT NULL,
    headers MAP<VARCHAR(9) NOT NULL, STRING NOT NULL> METADATA
)
WITH ('kafka.retention.time' = '0');

As well as defining a headers field, I’ve configured the underlying Kafka topic to use infinite retention.

Now we can run the INSERT:

INSERT INTO readings
  SELECT dateTime, measure, `value`, headers
  FROM (
    SELECT i.dateTime, REGEXP_REPLACE(i.measure,
                            'http://environment\.data\.gov\.uk/flood-monitoring/id/measures/',
                            '') AS measure, i.`value`, MAP['publisher',publisher,'version',version] AS headers, 
        ROW_NUMBER() OVER (PARTITION BY dateTime, measure
          ORDER BY dateTime asc) AS rownum
      FROM          `flood-monitoring-readings` r
            CROSS JOIN UNNEST(r.items) AS i) 
  WHERE rownum = 1

When you run this you’ll notice that, unlike the CREATE TABLE, the INSERT statement shows as STATUS: Running. It’ll stay like that forever or until you manually stop it. The INSERT statement is effectively the pipeline job. Since we’re dealing with unbounded data, it’ll keep on processing new data as it arrives, writing it to the target table.

In the above statement we’re doing the CROSS JOIN UNNEST to unpack the items array, as well as a couple of other bits of data processing too:

  • We’re deduplicating the inbound data using a ROW_NUMBER() function to return only the first row where there’re multiple entries for the same timestamp and measure.

  • The foreign key (FK) to measures includes a URL prefix that I’ve decided to strip out.

  • The full key looks like this:

    ‎ 

    http://environment.data.gov.uk/flood-monitoring/id/measures/9012-flow--i-15_min-m3_s

    ‎ 

    And all we actually need for a unique key is this:

    ‎ 

    9012-flow--i-15_min-m3_s

    ‎ 

    I’m doing this for two reasons. First, the full URL makes it more difficult to eyeball the data. Second, it’s the same set of redundant bytes; no point processing and storing more than we need to.

    ‎ 

    To do this we can use the REGEXP_REPLACE function:

    REGEXP_REPLACE(i.measure,
                    'http://environment\.data\.gov\.uk/flood-monitoring/id/measures/',
                    '')

  • The source message includes useful metadata that we’ll push into the Kafka message header. When we ran the CREATE TABLE we added a headers column with the special METADATA assignment. Now when we run the INSERT into this table write the publisher and version information to the field:

    SELECT
    []
        MAP['publisher',publisher,
            'version',version] AS headers
    []
    FROM flood-monitoring-readings

Let’s check the data in the target readings table:

We can also look at the underlying Kafka messages using the topic viewer in Confluent Cloud:

Note how the key holds the declared key of the table, value has just a single field (also called value), and the metadata is in the headers.

If we head back to the compute pool that we created earlier, we can see that it is, as expected, executing one statement (the INSERT). We can also look at metrics, including the rate at which it processed the initial backlog of messages on the source topic:

Processing the Source Dimension Data

With the readings fact data happily processing, let’s do the same for the measures and stations source data.

For these I’m going to use CTAS since I’m being lazy and not bothering to include the metadata in the headers this time :D

Measures

CREATE TABLE `measures-00`
    (PRIMARY KEY (`notation`) NOT ENFORCED)
    DISTRIBUTED BY HASH(`notation`)
    WITH ('changelog.mode' = 'upsert',
            'kafka.retention.time' = '0')
    AS SELECT   `notation` AS `notation`,
                `datumType`,
                `label`,
                `parameter`,
                `parameterName`,
                `period`,
                `qualifier`,
                REGEXP_REPLACE(`station`,
                        'http://environment\.data\.gov\.uk/flood-monitoring/id/stations/',
                        '') AS station,
                `stationReference`,
                `unit`,
                `unitName`,
                `valueType`
    FROM `flood-monitoring-measures` m
        CROSS JOIN UNNEST(m.items) AS i;
  • Primary key is notation.

  • Changelog is upsert, since the source data is a complete dimension snapshot each time. This means that existing records that appear in the new source data are updated, whilst preserving historical records that aren't in the current load. These historical records are still relevant for previous fact data that may join to them.

  • Set the Kafka topic to infinite retention.

  • Strip the URL prefix from the foreign key station value.

Stations

CREATE TABLE `stations-00`
    (PRIMARY KEY (`notation`) NOT ENFORCED)
    DISTRIBUTED BY HASH(`notation`)
    WITH ('changelog.mode' = 'upsert',
            'kafka.retention.time' = '0')
    AS SELECT   `notation`,
                `RLOIid`,
                `catchmentName`,
                `dateOpened`,
                `easting`,
                `label`,
                `lat`,
                `long`,
                `northing`,
                `riverName`,
                `stageScale`,
                `stationReference`,
                `status`,
                `town`,
                `wiskiID`,
                `datumOffset`,
                `gridReference`,
                `downstageScale`
    FROM `flood-monitoring-stations` s
        CROSS JOIN UNNEST(s.items) AS i;
  • Primary key is notation.

  • Changelog is upsert, for the same reasons as on the measures-00 table above.

  • Set the Kafka topic to infinite retention.

Transform (Step 2): Joining the Data

Now that we’ve got our source data into a sensible place—with one row per reading, one row per measure definition, and one row per station reference—we can now combine them.

For each incoming reading, we’re going to enrich it to add details about the measure for which the reading has been taken, and the station to which it relates.

The entities relate thus:

So for example, let’s take a row from readings:

At 2 p.m. on 28th May, the measure 710315-level-stage-i-15_min-m had a value of 0.176.

What is this measure? What are its units? And to which measuring station does it relate?

So the reading is for the Water Level measure at the measuring station 710315, in meters (m), reported every 15 minutes (900 seconds). We get these field definitions from the API documentation.

Now all we need to do is look up the station value:

This tells us that the station name is Pimlico Brook, it’s near the town Clitheroe, with a latitude and longitude, and other information.

Building a Join in Flink SQL

So let’s do this!

To work with joins in Flink SQL, it’s important to understand something about the semantics and what’s happening under the covers; for this, I ended up writing a whole separate blog post about joins and changelogs in Flink SQL. In a nutshell, if you took the SQL you’d run on a regular RDBMS it would look something like this:

SELECT *
    FROM readings r
        LEFT OUTER JOIN
        measures m
        ON r.measure = m.notation;

This is known as a regular join. In Flink SQL, this will cause the state size to increase as more data is read because Flink SQL will re-emit the join result if any of the joined tables have a new matching row. In some circumstances, this might be the behavior that you want, but here we just want to have Flink join the reading row to whatever value is currently on measures that matches it and then move on. No state, no re-emitting.

To do this we use a temporal join. This tells Flink to join to the version of a table at a given point in time. Since we want Flink to just join to the value that’s on the dimension tables (measures, stations) that’s there at the point of processing, we’re going to modify the tables to add a hard-coded column that will hold the value of the UNIX epoch (1st January 1970).

ALTER TABLE `measures-00`
    ADD epoch_ts AS TO_TIMESTAMP_LTZ(FROM_UNIXTIME(0));
ALTER TABLE `measures-00`
    MODIFY WATERMARK FOR epoch_ts AS epoch_ts;

ALTER TABLE `stations-00`
    ADD epoch_ts AS TO_TIMESTAMP_LTZ(FROM_UNIXTIME(0));
ALTER TABLE `stations-00`
    MODIFY WATERMARK FOR epoch_ts AS epoch_ts;

This way, any row in readings will always be dated later than the dimension and thus find a match.

Now we’ll do a join. I’ll test it first just to make sure that it’s working, using the same measure as above—710315-level-stage-i-15_min-m and for a limited timeframe. To start with, here’s joining readings to measures:

SELECT r.`dateTime`, r.`value`,
        m.`parameterName`, m.`unitName`, m.`period`, m.`station`
    FROM readings r
        LEFT OUTER JOIN
        `measures-00` FOR SYSTEM_TIME AS OF r.`$rowtime` AS m
            ON r.measure = m.notation
    WHERE r.measure = '710315-level-stage-i-15_min-m'
        AND r.dateTime BETWEEN TIMESTAMP '2025-05-28 13:00:00.000'
                            AND TIMESTAMP '2025-05-28 14:00:00.000';

This works, so let’s add in stations:

SELECT r.`dateTime`, r.`value`,
        s.`label`, s.`town`, s.`riverName`, s.`catchmentName`,
        m.`parameterName`, m.`unitName`, m.`period`, m.`station`
    FROM readings r
        LEFT OUTER JOIN
        `measures-00` FOR SYSTEM_TIME AS OF r.`$rowtime` AS m
            ON r.measure = m.notation
        LEFT OUTER JOIN
        `stations-00` FOR SYSTEM_TIME AS OF m.epoch_ts AS s
            ON m.station = s.notation
    WHERE r.measure = '710315-level-stage-i-15_min-m'
        AND r.dateTime BETWEEN TIMESTAMP '2025-05-28 13:00:00.000'
                            AND TIMESTAMP '2025-05-28 14:00:00.000';

This all looks good, so let’s take the training wheels off and let it go for a full date range. Since we’ve got a decent number of rows in the query results we now get the nice benefit of some of the data visualizations that are built into Confluent Cloud—here with a sparkline showing the river level value, and a histogram indicating the number of readings per bucket of time:

So that’s the successful join; before we go ahead and materialize it into its own table, let’s take the glass-half-empty approach and check for where we’re not getting matches:

SELECT r.`dateTime`, r.`value`, r.`measure`,
        s.`label`,
        m.`parameterName`
    FROM readings r
        LEFT OUTER JOIN `measures-00` FOR SYSTEM_TIME AS OF r.`$rowtime` AS m ON r.measure = m.notation
        LEFT OUTER JOIN `stations-00` FOR SYSTEM_TIME AS OF m.epoch_ts AS s ON m.station = s.notation
    WHERE s.`label` IS NULL
        OR m.`parameterName` IS NULL;

It’s the same join logic as above, but with each join condensed onto its own line—and a predicate added to catch any unmatched fact data. After setting this running I get no results. So that’s a good thing, right? No results means no unmatched rows?

Not necessarily. It could be that there are no results, or it could be that there are no results yet. Since this is a streaming query it’ll keep on running, but how do we know that it’s processed all of the data so far? Because if it has processed all the data so far and there are still no results, that’s good news.

To see where a query has got to we can use the statement metrics:

The two dimension tables are Not behind, meaning that Flink has read these (which we’d expect since they’re not that big). If you’re particularly eagle-eyed you’ll notice the watermark of 1st January 1970 too :)

The readings fact table is much bigger, and Flink is currently working its way through it, with another 1.7M rows (messages) still to go at the point I took this screenshot.

There’s a chart that shows the rate at which messages are being processed …

… as well as the remaining count—and we can see that we’ve now got through the backlog:

So, our query is still Running, but with no results and the backlog at zero, we can be confident that our join is working as it should, with no holes in the data:

Materializing the Join into a Table

Let’s now populate the enriched data into a new table. As with the dimensions and source fact table, this table will be populated by a continuous query. As new rows arrive on the source, they’ll be enriched and written to the target.

CREATE TABLE `readings-enriched-00` AS
        SELECT  r.`dateTime`, r.`measure`, r.`value` AS `value`,
                m.`parameterName`, m.`unitName`, s.`label` AS s_label,
                s.`town`, s.`riverName`, s.`catchmentName`,
                m.`label` AS m_label, m.`period`, m.`qualifier`,
                m.`valueType`, s.`stationReference`, s.`dateOpened`,
                TRY_CAST(s.`easting` AS INTEGER) AS easting,
                TRY_CAST(s.`northing` AS INTEGER) AS northing,
                TRY_CAST(s.`lat` AS DOUBLE) AS lat,
                TRY_CAST(s.`long` AS DOUBLE) AS lon
        FROM readings r
            LEFT OUTER JOIN `measures-00` FOR SYSTEM_TIME AS OF r.`$rowtime` AS m
                ON r.`measure` = m.notation
            LEFT OUTER JOIN `stations-00` FOR SYSTEM_TIME AS OF r.`$rowtime` AS s
                ON m.station = s.notation;

I’ve snuck in a handful more data transformations here, this time converting the location data (lat/long and easting/northing) of a station into suitable data types for subsequent use in queries and visualizations.

As before, we’ve got the statement metrics to follow the progress of the backfill and continued population of the table as new data continues to arrive:

With the table built, we can now use it. Here’s one of the visualizations available within the SQL Workspace, showing the river level at a particular measuring station:

This is useful, especially for exploring the data—but what about some full-blown analytics? For that, we’re going to get the data into Apache Iceberg and build some charts and dashboards with Apache Superset.

Load: Getting Data From Apache Kafka® and Apache Flink® Into Apache Iceberg™

What we’ve built so far is a rather simple—yet powerful—pipeline that uses SQL to process data in Kafka topics, applying transformations including schema changes, data type modification, and joining multiple tables.

Each topic is exposed as a table in Flink SQL. The initial Kafka topic is loaded by the fully managed HTTP source connector in Confluent Cloud, and subsequent topics are written to by the continuous output of results from the Flink SQL queries that run on Confluent Cloud.

To get the enriched data at the end of the pipeline into Iceberg we’ll use Tableflow. This was covered in the previous blog and is basically a checkbox option available in the topic list.

Now we can head to the topic list and enable Tableflow with just two clicks:

It really is as simple as that, and now we have the topic—being populated as a stream of enriched data from Flink SQL, remember—also exposed as an Iceberg table:

With the data in Iceberg, the world is our proverbial oyster! Any tool that supports Iceberg can now query it.

  • Let’s check the table is there, using the PyIceberg CLI:

    # List the schemas
    # (The Kafka cluster ID lkc-v2p3j0 is the schema name)
    $ pyiceberg --catalog REST --uri $ICEBERG_REST_CATALOG_URI --credential $ICEBERG_CREDS \
        list
    lkc-v2p3j0
    
    # List the tables in the schema
    $ pyiceberg --catalog REST --uri $ICEBERG_REST_CATALOG_URI --credential $ICEBERG_CREDS \
        list lkc-v2p3j0
    lkc-v2p3j0.readings-enriched
    
    # Describe the table
    $ pyiceberg --catalog REST --uri $ICEBERG_REST_CATALOG_URI --credential $ICEBERG_CREDS \
        describe lkc-v2p3j0.readings-enriched
    Table format version  2
    Metadata location     s3://cc-tableflow-1e164c44-421a-11f0-b02c-021c7f66f903/10110110/1010101/178cb46b-d78e-435d-8b6e-d8d023a08e6f/env-253ngy/lkc-v2p3j0/v1/6c483ac3-f79c-4…
    Table UUID            8a71b040-14cc-40ed-87b2-12097c722fb7
    Last Updated          1749142063390
    Partition spec        []
    Sort order            []
    Current schema        Schema, id=0
                          ├── 1: dateTime: required timestamptz
                          ├── 2: measure: required string
                          ├── 3: value: required double
                          ├── 4: parameterName: optional string
                          ├── 5: unitName: optional string
                          ├── 6: s_label: optional string
                          ├── 7: town: optional string
                          […]

  • Here’s Trino via PopSQL looking at a sample of the data:

    Trino also has a nice admin view that you can use to see what is running and how it’s executed:

Analyzing the Data

"Analyzing" may be a strong word here… “poking around” is perhaps more accurate.

Let’s now head over to Apache Superset, hooked up to Trino, querying the Kafka data via the Iceberg tables.

Each of the measurements come from a station, and we can plot these on a map:

Data Gremlins

When I started charting the data, I noticed a couple of slightly odd things. One of these was on a heatmap of rainfall across the entire previous month shows particular hotspots:

Now, the Lake District is traditionally a somewhat soggy place, but it seems like there’s a particular hotspot which may or not be correct. Zooming in on the area and looking at the other rainfall stations in the area it still seems like there’s a real spike of data from one in particular:

You’ll notice on the heatmap (left) much fainter dabs of color denoting rainfall that match where the stations are (right). By overlaying the two charts together we can get the particular coordinates for the station in question (-2.930285, 54.500172):

Using this location data, let’s examine the data:

SELECT dateTime, value
    FROM "lkc-qnygo6".readings_enriched_02
    WHERE parametername='Rainfall'
        AND lon = -2.930285
        AND lat= 54.500172

This shows several apparently anomalous readings:

How do we know they’re anomalous? If we look at all the other readings, these are literally off the charts—it’s only because the axis has set its bounds automatically to include them that all the other readings look like zero. If we change it to use a log axis, it looks like this:

The other factor is that this is in mm, and it’s a reading taken every 900 seconds—so if this number were right it’d be 9 METRES of water falling in a 15-minute period; highly unlikely.

If we filter out these errant values (with a WHERE clause omitting any value above 50—an arbitrary number really), the chart starts to look much more realistic:

Understanding the Location Data

I initially started filtering a dashboard I’d built by town, one of the station attributes. Readings kept disappearing when I applied the filter, until I realized that not every station has a town assigned to it.

Only Water Level and Flow stations have this data:

SELECT parametername,
        COUNT(DISTINCT town) as town_ct,
        COUNT(DISTINCT catchmentname) as catchmentname_ct,
        COUNT(DISTINCT rivername) as rivername_ct
FROM readings_enriched
GROUP BY parametername;

For other stations, such as Rainfall, the only identifying data about the station is its location—a latitude and longitude. So we can take a look at the country as a whole:

But as soon as we want to look at a particular area, this lack of identifying attribute on the station makes it much more difficult if all we can do is filter on an attribute "town" that isn’t present for all the data.

One option is to amend the query behind the dashboard to include a predicate that hard-codes a bounding box on the latitude/longitude:

[]
WHERE lon BETWEEN -2.05 AND -1.16
  AND lat BETWEEN 53.84 AND 54.15

This corresponds to this area of Yorkshire …

… and filters the data returned nicely just for this area:

Another way to do this is to use Trino’s geospatial functions, here filtering for all data within a 30-km radius of a fixed point (Ilkley, UK):

WHERE ST_Distance(
            to_spherical_geography(ST_Point(lon, lat)),
            to_spherical_geography(ST_Point( -1.8264663,53.927971))
        ) <= 30000;

Wrapping Up

One of the key things in what I’ve built here is the concept of shift left. Instead of dumping data into a data lake, to then process as a batch, I’ve moved that processing further upstream. The data’s processed with SQL and the resulting Kafka topic written to Iceberg using Tableflow.

This has multiple benefits:

  • Anyone using the data benefits from lower latency, because it now doesn’t need processing before being used.

  • There’s better consistency in the data, because everyone can read from the same version of it instead of creating their own pipelines.

  • Anyone who wants the data as a stream can do so, since the same data exists in Kafka too (this is what Tableflow writes to Iceberg).

  • The Flink transformations in Kafka created the equivalent of Silver level data (from the Medallion model) that can be easily queried and mapped into Gold level data products for different audiences to consume. These could include canoeists and white water kayakers checking real-time river levels or farmers analyzing rainfall to improve crop management decisions.

Once the data is in Iceberg format, it can be queried by any number of different engines and used as any data in Iceberg can be, including driving dashboards, ad hoc analysis, and in AI training.

‎ 

This uses from the real-time data API (Beta).
 
Apache®, Apache Kafka®, Kafka®, Apache Flink®, Flink®, Apache Iceberg™️, Iceberg™️, and the Iceberg logo are either trademarks or registered trademarks of the . No endorsement by the Apache Software Foundation is implied by the use of these marks. 

  • Robin works on the DevRel team at Confluent. His data engineering journey has taken him from building data warehouses on mainframes with COBOL to developing Oracle analytics solutions, before diving headfirst into the Kafka ecosystem and the modern data streaming world in recent years. Outside of work, Robin enjoys running, drinking good beer, and eating fried breakfasts—although generally not at the same time.

Avez-vous aimé cet article de blog ? Partagez-le !