Live demo: Kafka streaming in 10 minutes on Confluent | Watch now

Announcing ksqlDB 0.26

We are excited to bring forward yet another ksqlDB release with ksqlDB 0.26. This time we have focused on improving your experience with aggregate functions, JOINs, and schemas. We have also improved the way we handle dereferencing when working with structs. Read below to find out in more detail about the changes and check out the changelog for a full list of updates.

Aggregate functions

Our customers make use of our built-in aggregate functions to transform columns into new values. In the ksqlDB 0.26 release, we extend MIN and MAX aggregates, two of our most heavily used functions, to natively support DATE, TIME, and TIMESTAMP types.

Users can now perform Min and Max aggregations without the need for extra casting:

CREATE STREAM assignments (department STRING, Person Struct<Name String, Age Integer>) WITH (kafka_topic='assignments', value_format='json', partitions=1);

CREATE STREAM orders (id STRING, ts TIME) WITH (kafka_topic='orders_topic', value_format='json', partitions=1)

INSERT INTO orders (id, ts) VALUES (‘order_125346’,  1651765570);
INSERT INTO orders (id, ts) VALUES (‘order_125346’,  1635724801));

SELECT id, max(ts) as value FROM orders group by id;

+-------------------------------------------+--------------------------------------------------------+
|ID		                            |VALUE                                                   |
+-------------------------------------------+--------------------------------------------------------+
|order_125346                               |1651765570       		                             |

Improvements to joins

This release adds support for Stream-Stream and Table-Table RIGHT OUTER JOIN. RIGHT OUTER JOIN returns all records for the right side of the join and the matching records from the left side. If the matching records on the left side are missing, the corresponding columns will contain null values. See the example below.

Create two tables:

CREATE TABLE movies (
      title VARCHAR PRIMARY KEY,
      id INT,
      release_year INT
    ) WITH (
      KAFKA_TOPIC='movies',
      PARTITIONS=1,
      VALUE_FORMAT='JSON'
    );

CREATE TABLE lead_actor (
     title VARCHAR PRIMARY KEY,
     actor_name VARCHAR
   ) WITH (
     KAFKA_TOPIC='lead_actors',
     PARTITIONS=1,
     VALUE_FORMAT='JSON'
   );
```

Insert records:

INSERT INTO movies (id, title, release_year) VALUES (256, 'Citizen Kane', 1941);
INSERT INTO movies (id, title, release_year) VALUES (294, 'Die Hard', 1998);
INSERT INTO MOVIES (id, title, release_year) VALUES (128, 'The Big Lebowski', 1998);


INSERT INTO lead_actor (title, actor_name) VALUES ('Citizen Kane', 'Orson Welles');
INSERT INTO lead_actor (TITLE, ACTOR_NAME) VALUES ('The Big Lebowski','Jeff Bridges');

Note that we did not insert the actor record for Die Hard to illustrate the behavior of RIGHT OUTER JOIN. Let’s run a query now:

SELECT m.id, m.title, m.release_year, l.actor_name
FROM lead_actor l
RIGHT JOIN movies m
ON l.title = m.title
EMIT CHANGES
LIMIT 3;


+--------------------------+--------------------------+-------------------------------+-----------------------------------+
|ID                        |M_TITLE                   |RELEASE_YEAR                   |ACTOR_NAME                         |
+--------------------------+--------------------------+-------------------------------+-----------------------------------+
|256                       |Citizen Kane              |1941                           |Orson Welles                       |
|294                       |Die Hard                  |1998                           |null                               |
|128                       |The Big Lebowski          |1998                           |Jeff Bridges                       |
Limit Reached
Query terminated

You can find more details about the semantics of Stream-Stream and Table-Table RIGHT JOINs in the joining collections documentation page.

Support explicit message types for Protobuf with multiple definitions

Confluent Schema Registry lets one manage the schemas across an enterprise using JSON Schema, Avro, or Protobuf formats. In this release, ksqlDB’s ability to use Protobuf schemas has been enhanced.

A single Protobuf schema definition can define multiple schemas which can be selected individually or reference each other. Consider a Protobuf schema (schemaID = 1) with two message types defined:

syntax = "proto3";

message ProductKey {
  int32 P_ID = 1;
}

message ProductInfo {
  string P_NAME = 1;
  double P_PRICE = 2;
}

When creating a stream or table with this schema, one can control which message type is used by referencing names directly with key_schema_full_name and value_schema_full_name:

ksql> CREATE STREAM products WITH (
KAFKA_TOPIC='products', 
FORMAT='protobuf', 
KEY_SCHEMA_ID='1',
VALUE_SCHEMA_ID='1',
KEY_SCHEMA_FULL_NAME='ProductKey',
VALUE_SCHEMA_FULL_NAME='ProductInfo'
);

 Message        
----------------
 Stream created 
----------------

ksql> DESCRIBE products;

Name             	: PRODUCTS
 Field   | Type              	 
----------------------------------
 P_ID	 | INTEGER      	(key)
 P_NAME  | VARCHAR(STRING)   	 
 P_PRICE | DOUBLE            	 
---------------------------------

ksqlDB infers the key and value schema for this stream based on the explicitly-identified Protobuf message types. Accordingly, INSERTs are also validated against the specified message types.

ksql> INSERT INTO products(p_id, p_name, p_price) VALUES(1, 'rice', 2.99);

ksql> SELECT * FROM products;
+---------------------------+----------------------------+---------------------------+
|P_ID                       |P_NAME                      |P_PRICE                    |
+---------------------------+----------------------------+---------------------------+
|1                          |rice                        |2.99                       |
Query Completed

Handling null dereferencing inside functions

Following up on the null-handling improvement in 0.25.1, this release has a bug fix for handling dereferencing null structs in 0.26. As a concrete example, consider the following query:

SELECT IFNULL(requirements->education->degree, 'n/a') as required_edu_degree 
FROM stream;
EMIT CHANGES;

Previously if requirements, or education were null, then an exception could be thrown and cause the function to return a null value. With the fix, the dereference requirements->education->degree would return null, and then ifnull function can act on it appropriately and return n/a.

Get started with ksqlDB

Again, thank you for using ksqlDB. Please do not hesitate to contact us with more feedback or comments! For more details about the changes, please refer to the changelog. Get started with ksqlDB today, via the standalone distribution or with Confluent, and join the community to ask questions and find new resources.

Get Started

Tom Nguyen is an engineer on the ksqlDB team at Confluent. He joined in 2021 after previously building event systems in e-commerce, AI, and banking.

Did you like this blog post? Share it now

Subscribe to the Confluent blog

More Articles Like This

Real-Time Gaming Infrastructure for Millions of Users with Apache Kafka, ksqlDB, and WebSockets

The global video game market is bigger than the music and film industry combined. It includes Triple-A, casual/mid-core, mobile, and multiplayer online games. Various business models exist, such as hardware

Real-Time Wildlife Monitoring with Apache Kafka

Wildlife monitoring is critical for keeping track of population changes of vulnerable animals. As part of the Confluent Hackathon ʼ22, I was inspired to investigate if a streaming platform could

Serverless Stream Processing with Apache Kafka, Azure Functions, and ksqlDB

Serverless stream processing with Apache Kafka® is a powerful yet often underutilized field. Microsoft’s Azure Functions, in combination with ksqlDB and Confluent’s sink connector, provide a powerful and easy-to-use set