Level Up Your Kafka Skills in Just 5 Days | Join Season of Streaming
Building event streaming applications has never been simpler with ksqlDB. But what is it? ksqlDB is an event streaming database for building stream processing applications. Unlike Kafka Streams, ksqlDB programs are written declaratively in SQL. Its succinct syntax, JSON support, and documentation make it easy to get started, with four ksqlDB techniques in particular that stand out.
This blog post uses a dataset of simple IoT data, which lends itself very nicely to the techniques being highlighted and to stream processing in general. While we won’t be digging into streaming IoT applications at scale, you can check out some of Kai Waehner’s blog posts and videos to further explore that space.
ksqlDB can access any field in JSON event data regardless of whether the JSON is homogeneous or heterogeneous. Homogeneous JSON events always have the same structure:
{ "fullName": "Holly Waters", "contractId": "1234567", "modemMACId": "00-12-ab-34-cd-5e" }
{ "fullName": "Pennie Lane", "contractId": "7654321", "modemMACId": "00-27-ab-90-cd-7e" }
While heterogeneous JSON events do not have the same structure, each event can have a different set of fields depending on the payload type:
{ "contractId": "1234567", "modemMACId": "00-12-ab-34-cd-5e", "deviceData": { "networkThreat": "detected" } }
{ "contractId": 1234567, "modemMACId": "00-12-ab-34-cd-5e", "deviceData": { "sinr": 13, "frequency": "16 GHz", "beam": "127-a" } }
When accessing strongly typed JSON data in ksqlDB, you specify the SQL type for each field (complex types are supported).
Let’s use the following JSON object as the event that we are attempting to model in ksqlDB. We’ll assume that the value types are static, an example being that the fullName key always maps to the SQL type VARCHAR. You can use the STRUCT data type to model nested JSON, such as the serviceAddress field.
{ "fullName": "Holly Waters", "contractId": "1234567", "modemMACId": "00-12-ab-34-cd-5e", "deviceData": "{\"lastIpVisited\": \"8.8.8.8\", \"mbpsDown\": \"8.74\", \"mbpsUp\": \"1.01\", \"beam\": 127}", "serviceAddress": { "street": "11906 Albion St", "city": "Denver", "country": "USA" }. }
The ksqlDB representation of the JSON object above resembles the following:
CREATE STREAM subscriber_traffic( fullName VARCHAR, contractId VARCHAR, modemMACId VARCHAR, deviceData VARCHAR, serviceAddress STRUCT <street VARCHAR, city VARCHAR, country VARCHAR >) WITH (KAFKA_TOPIC='subscriber_traffic', VALUE_FORMAT='json');
Accessing data in a struct is fairly straightforward using the -> to access nested objects, as shown below:
SELECT SERVICEADDRESS->CITY, SERVICEADDRESS->COUNTRY FROM subscriber_traffic EMIT CHANGES;
This returns the below result:
+----------+----------+ |CITY |COUNTRY | +----------+----------+ |Denver |USA |
To learn more, check out the Apache Kafka® tutorials on working with nested JSON or flattening nested JSON.
Have you ever seen engineers stuff a string representation of JSON into a field? Something similar to the deviceData field in these events…
{ "fullName": "Holly Waters", "contractId": "1234567", "modemMACId": "00-12-ab-34-cd-5e", "deviceData": "{\"lastIpVisited\": \"8.8.8.8\", \"mbpsDown\": \"8.74\", \"mbpsUp\": \"1.01\", \"beam\": 127}", "serviceAddress": { "street": "11906 Albion St", "city": "Denver", "country": "USA" } }, { "fullName": "Pennie Lane", "contractId": "7654321", "modemMACId": "00-27-ab-90-cd-7e", "deviceData": "{\"lastIpVisited\": \"8.8.8.8\", \"sinr\": 13, \"beam\":\"127-a\"}", "serviceAddress": { "street": "51 Bayaud Crt", "city": "DC", "country": "USA" } }
If you have, then this technique may bring happy tears to your eyes. You’ll notice the events have a gnarly string of JSON that is not consistently structured—here’s what it looks like unescaped and pretty printed:
"deviceData": { "lastIpVisited": "8.8.8.8", "mbpsDown": "8.74", "mbpsUp": "1.01", "beam": 127 }
"deviceData": { "lastIpVisited": "8.8.8.8", "sinr": 13, "beam": "127-a" }
You can use ksqlDB’s EXTRACTJSONFIELD to retrieve nested field values from a string of JSON. It will return their value if the field key exists in that message; otherwise, it returns NULL, which means that you can specify all the permutations of fields that may exist without throwing an error when they don’t. Here’s an example of extracting the mbpsUP field, which only exists in one of the two messages:
SELECT EXTRACTJSONFIELD(DEVICEDATA, '$.mbpsUp') AS mbpsUP, CONTRACTID FROM subscriber_traffic EMIT CHANGES;+----------+----------+ |MBPSUP |CONTRACTID| +----------+----------+ |1.01 |1234567 | |null |7654321 |
Here’s another example in which both messages have the field of interest present but of different data types:
SELECT EXTRACTJSONFIELD(DEVICEDATA, '$.beam') AS beam, CONTRACTID FROM subscriber_traffic EMIT CHANGES; +----------+----------+ |BEAM |CONTRACTID| +----------+----------+ |127-a |7654321 | |127 |1234567 |
If you want to further process fields to set their type, you can use the CAST function. Let’s say you cast the field BEAM to an INT. The statement won’t return an error when BEAM 127-a is processed. Rather, 127-a becomes NULL.
SELECT CAST(EXTRACTJSONFIELD(DEVICEDATA, '$.beam') AS INT), CONTRACTID FROM subscriber_traffic EMIT CHANGES; +----------+----------+ |KSQL_COL_0|CONTRACTID| +----------+----------+ |null |7654321 | |127 |1234567 |
There’s a great tutorial about working with heterogeneous data if you’d like to dig into this in more detail.
If you work with sensitive data, like personally identifiable information (PII), you need to be careful with how that data can be accessed. One way to approach this is to obfuscate the PII data, which you can do with ksqlDB. It has a powerful feature that allows you to write the events from one topic to a new Kafka topic with any field masked, using the MASK family of functions.
Data before on original stream:
+-----------------+-----------------+-----------------+ |CONTRACTID |FULLNAME |STREET | +-----------------+-----------------+-----------------+ |7654321 |Pennie Lane |51 Bayaud Crt | |1234567 |Holly Waters |11906 Albion St |
Persistent real-time masking query:
CREATE STREAM network_pii_obfuscated WITH (KAFKA_TOPIC='network_pii_obfuscated', VALUE_FORMAT='JSON') AS SELECT CONTRACTID, MASK(FULLNAME) AS FULLNAME, MASK(SERVICEADDRESS->STREET) AS STREET FROM subscriber_traffic;
Data on new stream:
+-----------------+-----------------+-----------------+ |CONTRACTID |FULLNAME |STREET | +-----------------+-----------------+-----------------+ |7654321 |Xxxxxx-Xxxx |nn-Xxxxxx-Xxx | |1234567 |Xxxxx-Xxxxxx |nnnnn-Xxxxxx-Xx |
Data on new stream:
+-----------------+-----------------+-----------------+ |CONTRACTID |FULLNAME |STREET | +-----------------+-----------------+-----------------+ |7654321 |Xxxxxx-Xxxx |nn-Xxxxxx-Xxx | |1234567 |Xxxxx-Xxxxxx |nnnnn-Xxxxxx-Xx |
Here, we created a new stream network_pii_obfuscated, which has the sensitive fields masked. However, the original stream (subscriber_traffic) still has the sensitive fields unmasked. You may decide that you want to not only obfuscate data but also restrict who can access what based on their role. This is where Role-Based Access Control (RBAC) in ksqlDB comes in.
ℹ️ | What if I need to revert the masked data field back to its original state?
The original stream will always contain the unmasked data, but the masked stream’s data cannot be reverted back to its original state. If you need to be able to revert masked data, you should look into data tokenization. With data tokenization, a meaningful field within an event is substituted with something inconsequential but unique. Thus, the data can be reverted to its original state. Read this blog post to find out how you can perform tokenization with a tool such as the Privitar Kafka Connector. |
Use cases can evolve with time, and as of ksqlDB 0.12, there is support for dynamically updating active stream processing queries with the CREATE OR REPLACE syntax. Let’s say you were masking data in a stream and realized that you didn’t need to mask the entirety of the data within a field, like we did with STREET above. In this case, maybe it is okay to keep the street name, though any numbers must remain masked. Originally, the stream written by the obfuscation query looked like this:
+-----------------+-----------------+-----------------+ |CONTRACTID |FULLNAME |STREET | +-----------------+-----------------+-----------------+ |7654321 |Xxxxxx-Xxxx |nn-Xxxxxx-Xxx | |1234567 |Xxxxx-Xxxxxx |nnnnn-Xxxxxx-Xx |
Now we run a CREATE OR REPLACE query to update the persistent masking query to only mask numbers in the STREET field using the extended syntax of the MASK function. In this way, we specify which character types to mask:
CREATE OR REPLACE STREAM network_pii_obfuscated AS SELECT CONTRACTID, MASK(FULLNAME) AS FULLNAME, MASK(SERVICEADDRESS->STREET, NULL, NULL, 'n', NULL) AS STREET FROM subscriber_traffic;
Notice that the events processed prior to the REPLACE query did not change, but any new events that subsequently arrive will be masked per the updated definition:
+-----------------+-----------------+-----------------+ |CONTRACTID |FULLNAME |STREET | +-----------------+-----------------+-----------------+ |7654321 |Xxxxxx-Xxxx |nn-Xxxxxx-Xxx | |1234567 |Xxxxx-Xxxxxx |nnnnn-Xxxxxx-Xx | |0246810 |Xxxxx-Xxxxxx |nnnn Fancy Ln |
You’ve read about using ksqlDB to manipulate JSON, mask data, and update active stream processing queries. Each of these four techniques has robust documentation and examples, plus straightforward syntax. This article highlighted just a small subset of wonders you can do with ksqlDB, but there is much more. Check out Confluent Developer and Kafka Tutorials for other ways you can leverage the power of stream processing to your advantage.
Tableflow can seamlessly make your Kafka operational data available to your AWS analytics ecosystem with minimal effort, leveraging the capabilities of Confluent Tableflow and Amazon SageMaker Lakehouse.
Building a headless data architecture requires us to identify the work we’re already doing deep inside our data analytics plane, and shift it to the left. Learn the specifics in this blog.