Unlock the Secrets of Shifting Left in Our Upcoming Webinar | Register Now
We are excited to bring forward yet another ksqlDB release with ksqlDB 0.26. This time we have focused on improving your experience with aggregate functions, JOINs, and schemas. We have also improved the way we handle dereferencing when working with structs. Read below to find out in more detail about the changes and check out the changelog for a full list of updates.
Our customers make use of our built-in aggregate functions to transform columns into new values. In the ksqlDB 0.26 release, we extend MIN and MAX aggregates, two of our most heavily used functions, to natively support DATE, TIME, and TIMESTAMP types.
Users can now perform Min and Max aggregations without the need for extra casting:
CREATE STREAM assignments (department STRING, Person Struct<Name String, Age Integer>) WITH (kafka_topic='assignments', value_format='json', partitions=1);CREATE STREAM orders (id STRING, ts TIME) WITH (kafka_topic='orders_topic', value_format='json', partitions=1)
INSERT INTO orders (id, ts) VALUES (‘order_125346’, 1651765570); INSERT INTO orders (id, ts) VALUES (‘order_125346’, 1635724801));
SELECT id, max(ts) as value FROM orders group by id;
+-------------------------------------------+--------------------------------------------------------+ |ID |VALUE | +-------------------------------------------+--------------------------------------------------------+ |order_125346 |1651765570 |
This release adds support for Stream-Stream and Table-Table RIGHT OUTER JOIN. RIGHT OUTER JOIN returns all records for the right side of the join and the matching records from the left side. If the matching records on the left side are missing, the corresponding columns will contain null values. See the example below.
Create two tables:
CREATE TABLE movies ( title VARCHAR PRIMARY KEY, id INT, release_year INT ) WITH ( KAFKA_TOPIC='movies', PARTITIONS=1, VALUE_FORMAT='JSON' );CREATE TABLE lead_actor ( title VARCHAR PRIMARY KEY, actor_name VARCHAR ) WITH ( KAFKA_TOPIC='lead_actors', PARTITIONS=1, VALUE_FORMAT='JSON' );
Insert records:
INSERT INTO movies (id, title, release_year) VALUES (256, 'Citizen Kane', 1941); INSERT INTO movies (id, title, release_year) VALUES (294, 'Die Hard', 1998); INSERT INTO MOVIES (id, title, release_year) VALUES (128, 'The Big Lebowski', 1998);
INSERT INTO lead_actor (title, actor_name) VALUES ('Citizen Kane', 'Orson Welles'); INSERT INTO lead_actor (TITLE, ACTOR_NAME) VALUES ('The Big Lebowski','Jeff Bridges');
Note that we did not insert the actor record for Die Hard to illustrate the behavior of RIGHT OUTER JOIN. Let’s run a query now:
SELECT m.id, m.title, m.release_year, l.actor_name FROM lead_actor l RIGHT JOIN movies m ON l.title = m.title EMIT CHANGES LIMIT 3;
+--------------------------+--------------------------+-------------------------------+-----------------------------------+ |ID |M_TITLE |RELEASE_YEAR |ACTOR_NAME | +--------------------------+--------------------------+-------------------------------+-----------------------------------+ |256 |Citizen Kane |1941 |Orson Welles | |294 |Die Hard |1998 |null | |128 |The Big Lebowski |1998 |Jeff Bridges | Limit Reached Query terminated
You can find more details about the semantics of Stream-Stream and Table-Table RIGHT JOINs in the joining collections documentation page.
Confluent Schema Registry lets one manage the schemas across an enterprise using JSON Schema, Avro, or Protobuf formats. In this release, ksqlDB’s ability to use Protobuf schemas has been enhanced.
A single Protobuf schema definition can define multiple schemas which can be selected individually or reference each other. Consider a Protobuf schema (schemaID = 1) with two message types defined:
syntax = "proto3";
message ProductKey { int32 P_ID = 1; }
message ProductInfo { string P_NAME = 1; double P_PRICE = 2; }
When creating a stream or table with this schema, one can control which message type is used by referencing names directly with key_schema_full_name and value_schema_full_name:
ksql> CREATE STREAM products WITH ( KAFKA_TOPIC='products', FORMAT='protobuf', KEY_SCHEMA_ID='1', VALUE_SCHEMA_ID='1', KEY_SCHEMA_FULL_NAME='ProductKey', VALUE_SCHEMA_FULL_NAME='ProductInfo' );
Message ---------------- Stream created ----------------
ksql> DESCRIBE products;
Name : PRODUCTS Field | Type ---------------------------------- P_ID | INTEGER (key) P_NAME | VARCHAR(STRING) P_PRICE | DOUBLE ---------------------------------
ksqlDB infers the key and value schema for this stream based on the explicitly-identified Protobuf message types. Accordingly, INSERTs are also validated against the specified message types.
ksql> INSERT INTO products(p_id, p_name, p_price) VALUES(1, 'rice', 2.99);
ksql> SELECT * FROM products; +---------------------------+----------------------------+---------------------------+ |P_ID |P_NAME |P_PRICE | +---------------------------+----------------------------+---------------------------+ |1 |rice |2.99 | Query Completed
Following up on the null-handling improvement in 0.25.1, this release has a bug fix for handling dereferencing null structs in 0.26. As a concrete example, consider the following query:
SELECT IFNULL(requirements->education->degree, 'n/a') as required_edu_degree FROM stream; EMIT CHANGES;
Previously if requirements, or education were null, then an exception could be thrown and cause the function to return a null value. With the fix, the dereference requirements->education->degree would return null, and then ifnull function can act on it appropriately and return n/a.
Again, thank you for using ksqlDB. Please do not hesitate to contact us with more feedback or comments! For more details about the changes, please refer to the changelog. Get started with ksqlDB today, via the standalone distribution or with Confluent, and join the community to ask questions and find new resources.
Building a headless data architecture requires us to identify the work we’re already doing deep inside our data analytics plane, and shift it to the left. Learn the specifics in this blog.
A headless data architecture means no longer having to coordinate multiple copies of data, and being free to use whatever processing or query engine is most suitable for the job. This blog details how it works.