Confluent
Data Wrangling with Apache Kafka and KSQL
Stream Processing

Data Wrangling with Apache Kafka and KSQL

Robin Moffatt

KSQL, the SQL streaming engine for Apache Kafka®, puts the power of stream processing into the hands of anyone who knows SQL. It’s fun to use for exploring data in Kafka topics, but its real power comes in building stream processing applications. By continually streaming messages from one Kafka topic to another, applying transformations expressed in SQL, it is possible to build powerful applications doing common data wrangling tasks such as:

  • Applying schema to data
  • Filtering and masking data
  • Changing data structures (for example, flattening nested data)
  • Changing the serialization format
  • Enriching streams of data
  • Unifying multiple streams of data

Imagine you’ve got data on which you want to build some analytics. It’s coming from multiple sources—maybe different instances of the same system. You need to unify those sources into a single one, reformat the data along the way and apply a schema to the data so that it can be easily analyzed downstream. Apache Kafka and KSQL are a great choice for this.

Why Kafka? Because we can use it for integrating the source systems with those downstream, in near real time and with the ability to add and swap out sources and targets without disturbing the rest of the components. In addition, we can reprocess data as required and share it with other systems—enabling them to take advantage of the data cleansing and data wrangling that we’ve performed.

Overview of what we're going to build with Kafka Connect, KSQL, and Google Cloud Platform

 

In this article, we’ll see how to pull in data from REST sources, cleanse it and perform data wrangling with KSQL, then stream it out to both Google Cloud Storage (GCS) as well as Google BigQuery for analysis and visualization in Google Data Studio. We’re using Confluent Cloud to host our Kafka brokers, but it will work on a local cluster instead if you want to.

You can find all of the code used in this article on GitHub, including Docker Compose files.

Getting the data in

Credit: This section uses Environment Agency rainfall data from the real-time data API (beta).

Streaming data in from REST sources into Kafka using Kafka Connect

 

The data that I’m working with here is from the public API provided by the U.K. government’s Environment Agency. It contains readings taken from various weather stations around the country regarding information like rainfall, river levels and so on. Each station has its own REST API endpoint.

We’re going to pull in data from four of these, using Kafka Connect to poll the sources. The data structures from each endpoint vary slightly, giving us a nice real-world example of data cleansing problems.

I’m using the Kafka Connect REST source connector, which is a community-contributed connector written by Lenny Löfberg. Using this connector I can specify a REST endpoint and a polling interval, and the resulting payload is written to a Kafka topic.

To start with, create the connectors (one per station):

{
    "name": "source_rest_flood-monitoring-L2404",
    "config": {
        "connector.class": "com.tm.kafka.connect.rest.RestSourceConnector",
        "rest.source.url": "http://environment.data.gov.uk/flood-monitoring/id/stations/L2404",
        [...]
    }
}

With this running, I have a Kafka topic per weather station and data in each:

$ ccloud consume --from-beginning --topic flood-monitoring-059793|jq '.'                                                                                                                                 {
  "@context": "http://environment.data.gov.uk/flood-monitoring/meta/context.jsonld",
  "meta": {
    "publisher": "Environment Agency",
    [...]
    ]
  },
  "items": {
    [...]
    "label": "Rainfall station",
    "lat": 53.966652,
    "long": -1.115089,
    "measures": {
      [...]
      "latestReading": {
        "dateTime": "2018-08-22T05:30:00Z",
        "value": 0
        [...]

You’ll note from the above sample of JSON that it’s nested data. For this station (059793) there is just one measure. However, for another station (3680) there are multiple measure readings, and the measure is supplied as an array:

[...]
  "items": {
    "@id": "http://environment.data.gov.uk/flood-monitoring/id/stations/3680",
    "label": "Rainfall station",
    [...
    "measures": [
      {
        [...]
        "label": "rainfall-tipping_bucket_raingauge-t-15_min-mm",
        "latestReading": {
          "dateTime": "2018-08-30T04:00:00Z",
          "value": 0
          [...]
        },
      },
      {
        [...]
        "label": "temperature-dry_bulb-i-15_min-deg_C",
        "latestReading": {
          "dateTime": "2018-08-30T04:00:00Z",
          "value": 8
          [...]
        },
      }
    ],
[...]

This kind of varying structure in the data is a common problem in data integration and ETL. We’ll see in this article how KSQL can help with addressing this kind of challenge.

Now, we want to take these four Kafka topics and build a stream processing application that will populate a single unified output topic of transformed data from across the four topics. This will process data that already exists in the Kafka topic, along with every new message that arrives.

Declaring the schema

The data coming in is JSON—but with no declared schema as such. As a framework, Kafka Connect can automagically register a schema for inbound data that it serializes as Apache Avro™, but the REST connector here is basically just pulling string data from the REST endpoint, and that string data happens to be JSON. So, the first thing we’re going to do with KSQL is declare a schema for our source data from each topic. Note that the schema varies slightly to take into account the data from one of the stations that includes an array.

Applying a schema to a string of JSON, using KSQL
CREATE STREAM flood_monitoring_059793 \
    (meta STRUCT<publisher         VARCHAR, \
                 comment           VARCHAR>, \
     items STRUCT<eaRegionName     VARCHAR, \
                  label            VARCHAR, \
                  stationReference VARCHAR, \
                  lat              DOUBLE, \
                  long             DOUBLE, \
                  measures STRUCT<label         VARCHAR, \
                                  latestReading STRUCT<dateTime VARCHAR, \
                                                       value DOUBLE>,\
                                  parameterName VARCHAR, \
                                  unitName VARCHAR>> \
    ) WITH (KAFKA_TOPIC='flood-monitoring-059793',VALUE_FORMAT='JSON');

[...]

CREATE STREAM flood_monitoring_3680 \
    (meta STRUCT<publisher VARCHAR, \
                 comment VARCHAR>, \
     items STRUCT<eaRegionName VARCHAR, \
                  label VARCHAR, \
                  stationReference VARCHAR, \
                  lat DOUBLE, \
                  long DOUBLE, \
                  measures ARRAY<STRUCT<label VARCHAR, \
                        latestReading STRUCT<\
                            dateTime VARCHAR, \
                            value DOUBLE>,\
                        parameterName VARCHAR, \
                        unitName VARCHAR>>> \
    ) WITH (KAFKA_TOPIC='flood-monitoring-3680',VALUE_FORMAT='JSON');

With the Kafka topics registered and schemas defined, we can list them out:

ksql> show streams;

 Stream Name             | Kafka Topic                 | Format
----------------------------------------------------------------
 FLOOD_MONITORING_3680   | flood-monitoring-3680       | JSON
 FLOOD_MONITORING_L2404  | flood-monitoring-L2404      | JSON
 FLOOD_MONITORING_059793 | flood-monitoring-059793     | JSON
 FLOOD_MONITORING_L2481  | flood-monitoring-L2481      | JSON
----------------------------------------------------------------

Before we even do anything else to the data, we could use KSQL’s ability to reserialize and convert the raw JSON data into Avro. The advantage here is that any application downstream—whether it’s another KSQL process, Kafka Connect or a Kafka consumer—can work with the data directly from the topic and obtain the schema for it from the Confluent Schema Registry. To do this, use the CREATE STREAM…AS SELECT statement with the VALUE_FORMAT specified as part of the WITH clause:

CREATE STREAM FLOOD_MONITORING_3680_AVRO \
    WITH (VALUE_FORMAT='AVRO') AS \
    SELECT * FROM FLOOD_MONITORING_3680;

You can also define the partitioning and replication factor at this stage, too, if you wanted to change that.

For the rest of this exercise we’ll stick to the original JSON topics and apply the Avro serialization later on.

Working with nested data

Because the data in the source topic is nested JSON, we declare the parent column’s data type as STRUCT. To access the data with KSQL, use the -> operator:

select items->stationreference, \
       items->earegionname, \
       items->measures->parameterName, \
       items->measures->latestreading->datetime,\
       items->measures->latestreading->value, \
       items->measures->unitname \
from   flood_monitoring_L2481 limit 1;
L2481 | North East | Water Level | 2018-08-22T13:00:00Z | 5.447 | mAOD

For the readings that are part of an array (declared as an ARRAY<STRUCT>), use square brackets to designate the index:

ksql> select items->stationreference, \
        items->earegionname, \
        items->measures[0]->parameterName, \
        items->measures[0]->latestreading->datetime,\
        items->measures[0]->latestreading->value, \
        items->measures[0]->unitname \
 from   flood_monitoring_3680  limit 1;
3680 | Midland | Rainfall | 2018-08-30T04:00:00Z | 0.0 | mm

Unifying data from multiple streams

Taking a sample record from each topic and reading type gives us this table when manually collated:

Station reference

Station region

Measurement type

Timestamp

Measurement

Unit

3680

Midland

Rainfall

2018-08-30T04:00:00Z

0.0

mm

3680

Midland

Temperature

2018-08-30T04:00:00Z

8.0

deg C

059793

North East

Rainfall

2018-08-22T05:30:00Z

0.0

mm

L2481

North East

Water Level

2018-08-22T13:00:00Z

5.447

mAOD

L2404

North East

Water Level

2018-08-22T18:45:00Z

5.23

mAOD

Looking at the data, we can apply a data model that looks something like this:

+-Environment Readings
  +-Station reference
  +-Station region
  +-Type of measurement
  +-Measurement timestamp
  +-Measurement
  +-Measurement units

The type of reading (e.g., temperature, rainfall and river level) varies, as do the units associated with it and the station. We could normalize this out into stations, reading types and so on—but for ease of reporting we’ll actually denormalize it into a single flat structure. This means bringing in data from the multiple streams, including manually exploding the array within the 3680 topic (which contains both rainfall and temperature data):

Unifying multiple streams in KSQL (similar to UNION in RDBMS)

 

To do this, we can use KSQL’s INSERT INTO statement. This streams the results of a SELECT statement into an existing target STREAM. We’ll create the initial STREAM using CREATE STREAM…AS SELECT. We’ll also take the opportunity to serialize the data to Avro.

CREATE STREAM ENVIRONMENT_DATA WITH \
        (VALUE_FORMAT='AVRO') AS \
SELECT  items->stationreference AS stationreference, \
        items->earegionname AS earegionname, \
        items->label AS label, \
        items->lat AS lat, items->long AS long, \
        items->measures->latestreading->datetime AS reading_ts, \
        items->measures->parameterName AS parameterName, \
        items->measures->latestreading->value AS reading_value, \
        items->measures->unitname AS unitname \
 FROM   flood_monitoring_L2404 ;

INSERT INTO ENVIRONMENT_DATA \
SELECT  items->stationreference AS stationreference, \
        items->earegionname AS earegionname, \
        items->label AS label, \
        items->lat AS lat, items->long AS long, \
        items->measures->latestreading->datetime AS reading_ts, \
        items->measures->parameterName AS parameterName, \
        items->measures->latestreading->value AS value, \
        items->measures->unitname AS unitname \
 FROM   flood_monitoring_L2481 ;

-- (INSERT INTO repeated for the remaining source topics)

Now when we inspect the STREAMS, we can see the new one created and populated by the above statements:

ksql> show streams;

 Stream Name                | Kafka Topic                 | Format
-------------------------------------------------------------------
 FLOOD_MONITORING_3680      | flood-monitoring-3680       | JSON
 FLOOD_MONITORING_L2404     | flood-monitoring-L2404      | JSON
 FLOOD_MONITORING_059793    | flood-monitoring-059793     | JSON
 FLOOD_MONITORING_L2481     | flood-monitoring-L2481      | JSON
 ENVIRONMENT_DATA           | ENVIRONMENT_DATA            | AVRO
-------------------------------------------------------------------

Note that the “format” as shown in the column in the above output is AVRO. Using DESCRIBE EXTENDED you can check that messages are being processed by reviewing the Local runtime statistics:

ksql> DESCRIBE EXTENDED ENVIRONMENT_DATA;
[...]
Local runtime statistics
------------------------
messages-per-sec:         0   total-messages:      2311     last-message: 8/30/18 2:38:48 PM UTC
 failed-messages:         0 failed-messages-per-sec:         0      last-failed:       n/a
(Statistics of the local KSQL server interaction with the Kafka topic ENVIRONMENT_DATA)
ksql>

The unified topic is ENVIRONMENT_DATA and has data from all source topics within it:

ksql> SELECT * FROM ENVIRONMENT_DATA ;
1534992115367 | null | L2404 | North East | Foss Barrier | 53.952443 | -1.078056 | 2018-08-22T18:45:00Z | Water Level | 5.23 | mAOD
[...]
1535615911999 | null | L2481 | North East | York James Street TS | 53.960145 | -1.06865 | 2018-08-30T05:30:00Z | Water Level | 5.428 | mAOD
[...]
1535135263726 | null | 059793 | North East | Rainfall station | 53.966652 | -1.115089 | 2018-08-24T17:00:00Z | Rainfall | 0.0 | mm
[...]
1535638518251 | null | 3680 | Midland | Rainfall station | 52.73152 | -0.995167 | 2018-08-30T04:00:00Z | Rainfall | 0.0 | mm
[...]
1535638518251 | null | 3680 | Midland | Rainfall station | 52.73152 | -0.995167 | 2018-08-30T04:00:00Z | Temperature | 8.0 | deg C

Re-keying data in KSQL

Based on the above data model, the unique key for data is a composite of the station, reading type and timestamp. We’re going to handle the timestamp separately—for now, let’s see how to use KSQL to set the message key used by Kafka.

The message key is important as it defines the partition on which messages are stored in Kafka and is used in any KSQL joins. At the moment there’s no key set, so data for the same station and reading type could be scattered across partitions. For a few rows of data this may not matter, but as volumes increase it becomes more important to consider. It’s also pertinent to the strict ordering guarantee that Kafka provides, which only applies within a partition.

Using kafkacat we can inspect the partition assignments. I’m using a topic that I’ve created just for this purpose, with the serialization set to JSON (kafkacat doesn’t currently support Avro). By filtering for a given station we can see the partitions that messages are assigned to as well as the message key:

$ kafkacat -b kafka-broker:9092 -t ENVIRONMENT_DATA_JSON -f 'Partition: %p\tOffset: %o\tKey (%K bytes): %k\tValue (%S bytes): %s\n'|grep L2481

Partition: 0    Offset: 344 Key (-1 bytes):  Value (260 bytes): {"STATIONREFERENCE":"L2481"[...]
[...]
Partition: 1    Offset: 595 Key (-1 bytes):  Value (260 bytes): {"STATIONREFERENCE":"L2481"[...]
[...]
Partition: 2    Offset: 48  Key (-1 bytes):  Value (260 bytes): {"STATIONREFERENCE":"L2481"[...]
Partition: 2    Offset: 49  Key (-1 bytes):  Value (260 bytes): {"STATIONREFERENCE":"L2481"[...]
[...]

Note that the messages span several partitions and have a null key.

Next, let’s repartition our unified data stream using the PARTITION BY clause:

CREATE STREAM ENVIRONMENT_DATA_REKEYED AS \
    SELECT STATIONREFERENCE+PARAMETERNAME AS COMPOSITE_KEY, * FROM ENVIRONMENT_DATA \
    PARTITION BY COMPOSITE_KEY;

Checking the data with kafkacat again, we see the following:

kafkacat -b kafka-broker:9092 -t ENVIRONMENT_DATA_REKEYED -f 'Partition: %p\tOffset: %o\tKey (%K bytes): %k\tValue (%S bytes): %s\n'|grep L2481
% Auto-selecting Consumer mode (use -P or -C to override)
% Reached end of topic ENVIRONMENT_DATA_REKEYED2 [3] at offset 0
% Reached end of topic ENVIRONMENT_DATA_REKEYED2 [1] at offset 0
Partition: 2    Offset: 0       Key (16 bytes): L2481Water Level        Value (241 bytes): {"COMPOSITE_KEY":"L2481Water Level","STATIONREFERENCE":"L2481"[...]
Partition: 2    Offset: 1       Key (16 bytes): L2481Water Level        Value (241 bytes): {"COMPOSITE_KEY":"L2481Water Level","STATIONREFERENCE":"L2481"[...]
[...]
Partition: 2    Offset: 734     Key (16 bytes): L2481Water Level        Value (241 bytes): {"COMPOSITE_KEY":"L2481Water Level","STATIONREFERENCE":"L2481"[...]

All of the messages for the given key reside on a single partition, and each message has a key and value.

Managing message timestamps in KSQL

As well as messages having a key (and value), they also have a timestamp in their metadata. This can be set explicitly by the application producing the messages to Kafka, or in the absence of that messages will take the time at which it arrives at the Kafka broker. The timestamp of the messages we’re working with have the timestamp of the time at which they were ingested by Kafka Connect. However, the actual timestamp to use in processing the data for analysis is the items.measures.latestReading.dateTime value within the message. This matters particularly when using the data for aggregations, time-based partitioning and so on. Using the TIMESTAMPTOSTRING function we can examine the two timestamps discussed above:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), items->measures->latestReading->dateTime FROM FLOOD_MONITORING_L2404 LIMIT 1;
2018-08-23 01:11:53 | 2018-08-22T18:45:00Z

In this example, the data arrived in Kafka at 01:11 on Aug. 23, 2018, but the reading occurred at 18:45 on the Aug. 22, 2018. If we did any arithmetic on the data as it stands (for example, calculating the maximum reading value on Aug. 22), we’d get an incorrect answer. This is because KSQL uses the message timestamp (accessible through the virtual system column ROWTIME) in its time processing (such as windowed aggregates).

To rectify this, we can use KSQL. Just as we used the WITH clause previously to set the serialization format to Avro, we can use a similar pattern to override the timestamp that will be used for the messages in the target stream being created:

CREATE STREAM ENVIRONMENT_DATA_WITH_TS \
            WITH (TIMESTAMP='READING_TS', \
                  TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX') AS \
SELECT * \
  FROM ENVIRONMENT_DATA ;

This is currently blocked by #1439. The workaround is a two-step conversion:

CREATE STREAM ENVIRONMENT_DATA_WITH_TS_STG AS \
SELECT STRINGTOTIMESTAMP(READING_TS, 'yyyy-MM-dd''T''HH:mm:ssX') AS READING_TS_EPOCH, * \
FROM ENVIRONMENT_DATA ;

CREATE STREAM ENVIRONMENT_DATA_WITH_TS \
            WITH (TIMESTAMP='READING_TS_EPOCH') AS \
SELECT * \
  FROM ENVIRONMENT_DATA_WITH_TS_STG;

You can validate the conversion by comparing the ROWTIME of the newly created stream with the source READING_TS:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), READING_TS \
      FROM ENVIRONMENT_DATA_WITH_TS LIMIT 1;
2018-08-22 18:45:00 | 2018-08-22T18:45:00Z

Column derivations with KSQL

Beyond filtering, KSQL can be used to create derivations based on the incoming data. Let’s take the example of dates. As well as the raw timestamp of each reading that we receive, it could be that for ease of use downstream we want to also add columns for just year, month and so on. The TIMESTAMPTOSTRING function and DateTime format strings allow you to easily accomplish these tasks:

CREATE STREAM ENVIRONMENT_DATA_LOCAL_WITH_TS_AND_DATE_COLS AS \
SELECT *, \
       TIMESTAMPTOSTRING(ROWTIME,'QQQ') as READING_QTR, \
       TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd') as READING_YMD, \
       TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM') as READING_YM \
FROM   ENVIRONMENT_DATA_LOCAL_WITH_TS;

The stream now includes the new columns:

ksql> DESCRIBE ENVIRONMENT_DATA_LOCAL_WITH_TS_AND_DATE_COLS;

Name                 : ENVIRONMENT_DATA_LOCAL_WITH_TS_AND_DATE_COLS
 Field            | Type
----------------------------------------------
 ROWTIME          | BIGINT           (system)
 ROWKEY           | VARCHAR(STRING)  (system)
 READING_TS_EPOCH | BIGINT
 STATIONREFERENCE | VARCHAR(STRING)
 EAREGIONNAME     | VARCHAR(STRING)
 LABEL            | VARCHAR(STRING)
 LAT              | DOUBLE
 LONG             | DOUBLE
 READING_TS       | VARCHAR(STRING)
 PARAMETERNAME    | VARCHAR(STRING)
 READING_VALUE    | DOUBLE
 UNITNAME         | VARCHAR(STRING)
 READING_QTR      | VARCHAR(STRING)
 READING_YMD      | VARCHAR(STRING)
 READING_YM       | VARCHAR(STRING)
----------------------------------------------

Each message includes the derived data:

ksql> SELECT READING_TS, \
             READING_QTR, \
             READING_YMD, \
             READING_YM \
      FROM   ENVIRONMENT_DATA_LOCAL_WITH_TS_AND_DATE_COLS \
             LIMIT 5;
2018-08-26T18:15:00Z | Q3 | 2018-08-26 | 2018-08
2018-08-24T18:15:00Z | Q3 | 2018-08-24 | 2018-08
2018-08-29T09:15:00Z | Q3 | 2018-08-29 | 2018-08
2018-08-23T18:15:00Z | Q3 | 2018-08-23 | 2018-08
2018-08-25T05:30:00Z | Q3 | 2018-08-25 | 2018-08
Limit Reached
Query terminated
Note: In this example we’re building step by step a series of transformations in a daisy-chain style. In practice you may refactor them into fewer steps, but I’m keeping them separate here to make the explanations clearer.

Filtering data with KSQL

Let’s see how we can filter the data using KSQL. Each CREATE STREAM…AS SELECT statement creates a Kafka topic populated continually with the results of the transformation. We can use the same approach to filter the stream of data. For example, using the KSQL function GEO_DISTANCEwe can filter the stream of data to just messages within the given distance of a point.

CREATE STREAM ENVIRONMENT_DATA_LOCAL_WITH_TS AS \
SELECT * FROM ENVIRONMENT_DATA_WITH_TS \
WHERE  GEO_DISTANCE(LAT,LONG,53.919066, -1.815725,'KM') < 100;

Masking data with KSQL

As well as filtering entire rows based on a predicate as shown above, KSQL can also be used to “filter” columns from a message. Imagine you have a field in your source data that you don’t want to persist downstream—with KSQL you simply create a derived stream and omit the column(s) in question from the projection:

ksql> DESCRIBE ENVIRONMENT_DATA;

Name                 : ENVIRONMENT_DATA
 Field            | Type
----------------------------------------------
 ROWTIME          | BIGINT           (system)
 ROWKEY           | VARCHAR(STRING)  (system)
 STATIONREFERENCE | VARCHAR(STRING)
 EAREGIONNAME     | VARCHAR(STRING)
 LABEL            | VARCHAR(STRING)
 LAT              | DOUBLE
 LONG             | DOUBLE
 READING_TS       | VARCHAR(STRING)
 PARAMETERNAME    | VARCHAR(STRING)
 READING_VALUE    | DOUBLE
 UNITNAME         | VARCHAR(STRING)
----------------------------------------------

ksql> CREATE STREAM ENVIRONMENT_DATA_MINIMAL AS \
        SELECT STATIONREFERENCE, READING_TS, READING_VALUE \
        FROM ENVIRONMENT_DATA;

ksql> DESCRIBE ENVIRONMENT_DATA_MINIMAL;

Name                 : ENVIRONMENT_DATA_MINIMAL
 Field            | Type
----------------------------------------------
 ROWTIME          | BIGINT           (system)
 ROWKEY           | VARCHAR(STRING)  (system)
 STATIONREFERENCE | VARCHAR(STRING)
 READING_TS       | VARCHAR(STRING)
 READING_VALUE    | DOUBLE
----------------------------------------------

As well as simply dropping a column, KSQL ships with functions to mask data:

ksql> SELECT STATIONREFERENCE, EAREGIONNAME \
      FROM ENVIRONMENT_DATA;
L2404 | North East

ksql> SELECT STATIONREFERENCE, EAREGIONNAME, MASK_RIGHT(EAREGIONNAME,4) AS REGION_NAME_MASKED \
      FROM ENVIRONMENT_DATA2;
L2404 | North East | North Xxxx

There are several MASK-based functions, and if you have your own special sauce you’d like to use here, KSQL does support user-defined functions (UDFs) as of Confluent Platform 5.0.

Recap

So far, we’ve ingested data from several sources with similar but varying data models. Using KSQL, we’ve accomplished data wrangling by:

  • Flattening nested data structures
  • Reserializing JSON data to Avro
  • Unifying the multiple streams into one
  • Setting the message partitioning key
  • Setting the message timestamp metadata to the correct logical value
  • Creating derived columns in the transformation
  • Filtering and masking the data
Data wrangling with KSQL and Kafka Connect

The results of these transformations is continually populated Kafka topics. As new messages arrive on the source, continuously running KSQL statements process and write them to the target Kafka topic.

Streaming onwards…

The great thing about Kafka is its ability to build systems in which functionality is compartmentalized. Ingest is handled by one process (in this case, Kafka Connect), and transformation is handled by a series of KSQL statements. Each can be modified and switched out for another without impacting the pipeline we’re building. Keeping them separate makes it easier to perform important activities such as testing, troubleshooting and analyzing performance metrics. It also means that we can extend data pipelines easily.

We may have a single use case in mind when initially building it, and one way to do this would be building a single application that pulls data from REST endpoints before cleansing, wrangling and writing it out to the original target. But now if we want to add other targets, we have to modify that application, which becomes more complex and risky. Instead, by breaking up the processes and building them all around Kafka, adding another target for the data is as simple as consuming the transformed data from a Kafka topic.

So, let’s take our transformed data and do something with it! We can use it to drive analytic requirements, but we’ll also see how it can drive applications themselves, too.

Overview of what we're going to build with Kafka Connect, KSQL, and Google Cloud Platform

For our analytics, we’re going to land the data to BigQuery, Google’s cloud data warehouse tool. We’ll use another Kafka Connect community connector, one written by WePay to stream data from Kafka topics to BigQuery. You’ll need to set up your Google Cloud Platform (GCP) credentials in a file accessible to the Connect worker(s), and also make sure that the BigQuery project and dataset exist first. Here, I’m using ones called devx-testing and environment_data, respectively:

{
  "name": "sink_gbq_environment-data",
  "config": {
    "connector.class":"com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    "topics": "ENVIRONMENT_DATA",
    "autoCreateTables":"true",
    "autoUpdateSchemas":"true",
    "project":"devx-testing",
    "datasets":".*=environment_data",
    "keyfile":"/root/creds/gcp_creds.json"
    [...]

Once deployed, we can see data arriving in BigQuery using the GCP console:

Google BigQuery with streamed through Kafka and transformed with KSQL

We can also use bq:

$ bq ls environment_data
         tableId           Type    Labels   Time Partitioning
 ------------------------ ------- -------- -------------------
  ENVIRONMENT_DATA         TABLE            DAY

$ bq query 'select * from environment_data.ENVIRONMENT_DATA'
Waiting on bqjob_r5ce1258159e7bf44_000001658f8cfedb_1 ... (0s) Current status: DONE
+------------------+--------------+------------------------+------+------------+----------------------+-----------+-----------+----------------------+---------------+-------+----------+
| STATIONREFERENCE | EAREGIONNAME |       EAAREANAME       | TOWN | RIVERNAME  |        LABEL         |    LAT    |   LONG    |       DATETIME       | PARAMETERNAME | VALUE | UNITNAME |
+------------------+--------------+------------------------+------+------------+----------------------+-----------+-----------+----------------------+---------------+-------+----------+
| L2404            | North East   | North East - Yorkshire | York | River Ouse | Foss Barrier         | 53.952443 | -1.078056 | 2018-08-08T16:30:00Z | Water Level   |  5.01 | mAOD     |
| L2404            | North East   | North East - Yorkshire | York | River Ouse | Foss Barrier         | 53.952443 | -1.078056 | 2018-08-08T18:15:00Z | Water Level   | 5.003 | mAOD     |
[...]

There are many ways to work with data in BigQuery: the direct SQL interface, the GUI console—or through numerous analytics visualization tools, including Looker, Tableau, Qlik, Redash, etc. Here, I’ve used Google’s own Data Studio. Connecting to BigQuery is simple, and once the dataset is in Data Studio, it’s a matter of moments to throw some useful visualizations together:

Google Data Studio showing data from BigQuery streamed through Kafka and transformed with KSQL Google Data Studio showing data from BigQuery streamed through Kafka and transformed with KSQL

 

We’ve discussed streaming data to Google BigQuery, but did you know that you can also stream the same transformed data to GCS for archival purposes or even batch access from other applications (although arguably this would be done from consuming the Kafka topic directly)?

{
  "name": "sink_gcs_environment-data",
  "config": {
    "connector.class": "io.confluent.connect.gcs.GcsSinkConnector",
    "topics": "ENVIRONMENT_DATA",
    "gcs.bucket.name": "rmoff-environment-data",
    "gcs.part.size": "5242880",
    "flush.size": "16",
    "gcs.credentials.path": "/root/creds/gcp_creds.json",
[...]

With this connector running, we now have data streaming to both BigQuery and GCS:

$ gsutil ls gs://rmoff-environment-data/topics/
gs://rmoff-environment-data/topics/ENVIRONMENT_DATA/

$ gsutil ls gs://rmoff-environment-data/topics/ENVIRONMENT_DATA/partition=0/
gs://rmoff-environment-data/topics/ENVIRONMENT_DATA/partition=0/ENVIRONMENT_DATA+0+0000000000.json
gs://rmoff-environment-data/topics/ENVIRONMENT_DATA/partition=0/ENVIRONMENT_DATA+0+0000000016.json

Conclusion

KSQL and Apache Kafka are a powerful way to build integration between systems, with transformation applied to the data in flight, and the resulting data available to multiple consuming applications downstream. By working with streaming data, organizations can take advantage of the transform-once-use-many paradigm, since the data is available instantly for real-time applications to use. Applications with less immediate requirements (as is often the case with analytics) can still use the same data, reducing complexity, minimizing the duplication of code and leading to a more flexible and powerful architecture. 

Using KSQL, streaming processing can be expressed using SQL alone so you don’t need to write any Java code—opening up stream processing to a much wider audience of developers.

If you’re interested in what KSQL can do

You can find all of the code used in this article on GitHub.

Subscribe to the Confluent Blog

Subscribe

More Articles Like This

Using Apache Kafka to Drive Cutting-Edge Machine Learning
Kai Waehner

Using Apache Kafka to Drive Cutting-Edge Machine Learning

Kai Waehner . .

Machine learning and the Apache Kafka® ecosystem are a great combination for training and deploying analytic models at scale. I had previously discussed potential use cases and architectures for machine ...

Kafka Connect Deep Dive – Converters and Serialization Explained
Robin Moffatt

Kafka Connect Deep Dive – Converters and Serialization Explained

Robin Moffatt . .

Kafka Connect is part of Apache Kafka®, providing streaming integration between data stores and Kafka. For data engineers, it just requires JSON configuration files to use. There are connectors for ...

Kafka broker controller shutdown process
Jun Rao

Apache Kafka Supports 200K Partitions Per Cluster

Jun Rao . .

In Kafka, a topic can have multiple partitions to which records are distributed. Partitions are the unit of parallelism. In general, more partitions leads to higher throughput. However, there are ...

Leave a Reply

Your email address will not be published. Required fields are marked *

Try Confluent Platform

Download Now

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.