Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Announcing ksqlDB 0.24.0

Written By

We are excited to announce ksqlDB 0.24! It comes with a slew of improvements and new features. Access to Apache Kafka® record headers will enable a whole host of new use cases involving analytics and processing existing Kafka topics. JSON functions make working with unstructured data much easier. Finally, the LIMIT clause allows you to restrict the number of records emitted from a pull query, allowing you to control how much data you handle client side. Check out the changelog for a complete list of updates.

Access record header data

A Kafka record consists of a key, value, metadata, and optional headers. ksqlDB allows users to access record keys, values, and various metadata. Starting in 0.24, users can also access headers by declaring a column to be populated by the record’s headers. Headers typically contain metadata about records, which ksqlDB could use for routing or processing.

With the HEADERS syntax, users can access either the full header or a particular key in the header using SQL, as shown in the example below.

CREATE STREAM orders (
name STRING,
price DECIMAL(4,2),
headers ARRAY<STRUCT<key STRING, value BYTES>> HEADERS
) WITH (
	kafka_topic='orders';
	value_format='json',
	partitions=6;
);
SELECT * FROM orders EMIT CHANGES;

+--------+--------+---------------------------------------+ | NAME | PRICE | HEADERS | +--------+--------+---------------------------------------+ | chips | 2.30 | [{‘currency’,’VVNE’}] | +--------+--------+---------------------------------------+ | coffee | 3.05 | [{‘currency’,’Q0FE’},{‘id’,’MjQ2..’}] | +--------+--------+---------------------------------------+ | burger | 8.72 | [] | +--------+--------+---------------------------------------+

Access a specific key in a topic’s headers:

CREATE STREAM uploads (
name STRING,
photo BYTES,
image_format BYTES HEADER(‘format’)
) WITH (
	kafka_topic=’uploads’;
	value_format=’json’,
	partitions=6;
);

SELECT name, photo, from_bytes(image_format, ‘hex’) as format FROM uploads EMIT CHANGES; +--------+---------+--------+ | NAME | PHOTO | FORMAT | +--------+---------+--------+ | stars | JKwg.. | jpeg | +--------+---------+--------+ | horse | Eqwq.. | png | +--------+---------+--------+ | ocean | MxKs.. | NULL | +--------+---------+--------+

JSON Functions

ksqlDB 0.24 comes with new functions aiding in processing JSON-formatted strings. JSON is the de-facto format for web applications and many other scenarios such as IoT. As a result, many of our customers send JSON messages to Kafka.

With the newly added functions, ksqlDB users can process JSON structures encoded in string fields and serialize any ksqlDB data type to a JSON string.

IS_JSON_STRING checks if a given string contains valid JSON. This function is helpful to ensure that downstream results will be parsed successfully. Consider a stream of messages imported from an external system. As there is no control over the quality of the imported messages, it might be a good idea to filter out invalid records to prevent duplicated error messages downstream:

CREATE STREAM raw_messages (message STRING) WITH (kafka_topic='messages', VALUE_FORMAT='JSON');
CREATE STREAM json_messages 
    AS SELECT message as json_message 
    FROM raw_messages 
    WHERE IS_JSON_STRING(message);

JSON_CONCAT merges two or more JSON structures. Consider a similar example where a stream contains multiple columns containing JSON objects that are only useful together in the application context. With ksqlDB we can merge them together:

CREATE STREAM raw_messages (message1 STRING, message2 STRING) WITH (kafka_topic='messages', VALUE_FORMAT='JSON');
CREATE STREAM json_messages 
    AS SELECT message1, message2, JSON_CONCAT(message1, message2) 
    FROM raw_messages 
    WHERE IS_JSON_STRING(message1) AND IS_JSON_STRING(message2);

JSON_RECORDS and JSON_KEYS extract key-value pairs and keys accordingly from JSON-formatted strings:

SELECT JSON_RECORDS(json_message) as keys FROM json_messages;

+-------------------------------------------------------------------+ |KEYS | +-------------------------------------------------------------------+ |{type="deprecated", amount=123.5, id=1} | |{type="standard", amount=10, id=null} |
|{type="extra", amount=5, id=3} |

SELECT JSON_KEYS(json_message) as keys FROM json_messages;

+------------------------------------------------------------------------------------+ |KEYS | +------------------------------------------------------------------------------------+ |[type, amount, id] | |[type, amount, id] | |[type, amount, id]

JSON_ARRAY_LENGTH computes the length of the array encoded as JSON string:

SELECT json_message as phones, JSON_ARRAY_LENGTH(json_message) as phones_length FROM json_messages;

+----------------------------------------------------------------------+-------------+ |PHONES |PHONES_LENGTH| +----------------------------------------------------------------------+-------------+ |["(984) 459-0666", "(253) 803-6544"] |2 | |["(232) 891-6803"] |1 | |["(342) 603-4952", "(891) 987-2476", "(792) 932-4901"] |4 |

Finally, TO_JSON_STRING converts any ksqlDB data type back to JSON string:

INSERT INTO raw_messages (message) 
       VALUES (TO_JSON_STRING(STRUCT(
          c := ARRAY[5, 10, 15],
          d := MAP(
            'x' := STRUCT(e := 'v1', f := true),
            'y' := STRUCT(e := 'v2', f := false)
       ))));

INSERT INTO raw_messages (message) VALUES (TO_JSON_STRING(STRUCT( c := ARRAY[2, 4, 8], d := MAP( 'x' := STRUCT(e := 'v3', f := true), 'y' := STRUCT(e := 'v4', f := true) ))));

INSERT INTO raw_messages (message) VALUES (TO_JSON_STRING(STRUCT( c := ARRAY[3, 6, 9], d := MAP( 'x' := STRUCT(e := 'v5', f := false), 'y' := STRUCT(e := 'v6', f := false) ))));

SELECT * FROM raw_messages;

+------------------------------------------------------------------------------------+ |MESSAGE | +------------------------------------------------------------------------------------+ |{"C":[5,10,15],"D":{"x":{"E":"v1","F":true},"y":{"E":"v2","F":false}}} | |{"C":[2,4,8],"D":{"x":{"E":"v3","F":true},"y":{"E":"v4","F":true}}} | |{"C":[3,6,9],"D":{"x":{"E":"v5","F":false},"y":{"E":"v6","F":false}}} |

Pull query LIMIT clause

Pull queries now support the LIMIT clause that you might already be familiar with. Users can now restrict the number of rows returned by executing a pull query over a STREAM or a TABLE by using the LIMIT clause. The syntax of pull queries is:

SELECT select_expr [, ...] 
  FROM table/stream        
  [ WHERE where_condition ]
  [ AND window_bounds ]    
  [ LIMIT num_records ]; 

Execute a pull query with LIMIT clause over a STREAM:

CREATE STREAM STUDENTS (ID STRING KEY, SCORE INT) 
  WITH (kafka_topic='students_topic', value_format='JSON', partitions=4);

SELECT * FROM STUDENTS LIMIT 3;

+---------------+---------------+ |ID |SCORE | +---------------+---------------+ |ayala |97 | |spock |48 | |janice |87 | Limit Reached

Similarly, users can execute a pull query with a LIMIT clause over a TABLE as well.
You can learn more about pull queries here.

Create table and stream using schema from schema ID

Starting from 0.24, users can specify a key_schema_id or value_schema_id in stream and table-creation commands. The schema specified by the ID will be looked up in the schema registry and used to create the logical schema as well as serialize and deserialize data. You can learn more in the documentation and the KLIP.

Examples:

Create a stream that uses an explicit key schema and value schema in Schema Registry. ksqlDB will infer the key and value columns according to the respective Avro schemas.

CREATE STREAM pageviews WITH (
    KAFKA_TOPIC='pageviews-input',
    FORMAT='AVRO',
    KEY_SCHEMA_ID=1,
    VALUE_SCHEMA_ID=2
  );

Create a persistent query that writes to a table and applies an explicit value schema. Record values will be written to the output topic according to this schema.

CREATE TABLE pageviews_count
  WITH (
    VALUE_FORMAT='AVRO',
    VALUE_SCHEMA_ID=3,
    KAFKA_TOPIC='pageviews-count-output'
  ) AS
  SELECT
    pageId,
    count(*)
  FROM pageviews
  GROUP BY pageID

Get started with ksqlDB

We’re excited to bring these features out and improve the product. For more details about the changes, please refer to the changelog. Get started with ksqlDB today, via the standalone distribution or with Confluent, and join the community to ask questions and find new resources.

Get Started

  • Tom Nguyen is an engineer on the ksqlDB team at Confluent. He joined in 2021 after previously building event systems in e-commerce, AI, and banking.

Did you like this blog post? Share it now