Show Me How: Build Streaming Data Pipelines for Real-Time Data Warehousing | Register Today
We are thrilled to announce ksqlDB 0.25! It comes with a slew of improvements and new features. In particular, we improved how UDAFs work with complex types like Structs and Maps to enhance popular functions like latest_by_offset
and collect_list
. This release also contains multiple improvements to the product, check out the highlights below and the changelog for a complete list of updates.
Aggregate Functions allow for powerful ways to transform a column into a new value. Previously, aggregate functions like latest_by_offset
or collect_list
only worked with primitive types like strings, booleans, and numeric columns. Those restrictions have been lifted, and now you can use collect_list
, collect_set
, earliest_by_offset
, and latest_by_offset
with Structs, Arrays, and Maps.
As an example, let us see collect_list
in action with a Struct type.
CREATE STREAM assignments (department STRING, Person Struct) WITH (kafka_topic='assignments', value_format='json', partitions=1);INSERT INTO assignments (department, person) VALUES ('engineering', Struct(Name :='Karen', Age := 55)); INSERT INTO assignments (department, person) VALUES ('engineering', Struct(Name :='Juliet', Age := 39)); INSERT INTO assignments (department, person) VALUES ('sales', Struct(Name :='Bellamy', Age := 27)); INSERT INTO assignments (department, person) VALUES ('sales', Struct(Name :='Will', Age := 39));
SELECT * FROM ASSIGNMENTS;
+-------------------------------------------+--------------------------------------------------------+ |DEPARTMENT |PERSON | +-------------------------------------------+---------------------------------------------------------+ |engineering |{NAME=Karen, AGE=55} | |engineering |{NAME=Juliet, AGE=39} | |sales |{NAME=Bellamy, AGE=27} | |sales |{NAME=Will, AGE=39} |
We can group by department and then collect the list of staff in each department.
SELECT department, collect_list(person) AS staffByDepartment FROM assignments GROUP BY department EMIT CHANGES; +--------------------------------------------+-------------------------------------------------------------------+ |DEPARTMENT | STAFFBYDEPARTMENT | +--------------------------------------------+-------------------------------------------------------------------+ |engineering |[{NAME=Karen, AGE=55}, {NAME=Juliet, AGE=39}] | |sales |[{NAME=Bellamy, AGE=27}, {NAME=Will, AGE=39}] |
Behind the scenes, these changes required updating how User Defined Aggregate Functions (UDAFs) work. Specifically, one can now create UDAFs where the aggregate and return types vary depending on the SQL type of the input column. For those experienced with implementing UDFs, this should sound similar to the SchemaProvider
interface. A UDAF’s aggregate
or map
function can depend on knowing the input type, so the solution here is different.
Users building UDAFs that operate on general data types will implement three new methods initializeTypeArguments(List argTypeList)
, getAggregateSqlType()
, and getReturnSqlType()
. Concrete examples are in the ksqlDB codebase here: latest_by_offset or collect_list.
You can find further documentation in the new “Dynamic UDAFs” section of the ksqlDB documentation page.
This release contains a multitude of other improvements, including the following.
Previously, if a nested function threw an exception, the column would always be null.
As an example, one may want to try to parse a column of text into a timestamp falling back to -01-01 if the column is invalid.
SELECT parse_date(date_string, 'yyyy-MM-dd') AS parse_as_date, parse_date(concat(date_string, '-01-01'), 'yyyy-MM-dd') AS parse_as_year, coalesce( parse_date(date_string, 'yyyy-MM-dd'), parse_date(concat(date_string, '-01-01'), 'yyyy-MM-dd') ) as parse_as_either FROM inputs EMIT CHANGES;;
Starting in this release, a function that fails returns null
to the calling function. This allows the coalesce
and ifnull
functions to have more intuitive behavior.
We now allow users to specify custom request headers for requests issued by the Java client and when applying migrations with the ksql-migrations tool. For example, in advanced networking topologies, custom headers can be leveraged by a proxy layer to control the flow of requests to the ksqlDB cluster.
This release adds support for push query continuation tokens in the Java client. This allows push queries to also have at-least-once semantics (ALOS) in the client. Users can resume streaming results from a StreamedQueryResult
from the last saved continuation token by using the method continueFromLastContinuationToken()
. With ALOS in the client, push queries are more robust against network failures, since the client can pick up where it left off.
We’re excited to bring these features out and improve the product. For more details about the changes, please refer to the changelog. Get started with ksqlDB today, via the standalone distribution or with Confluent Cloud, and join the community to ask questions and find new resources.
An Approach to combining Change Data Capture (CDC) messages from a relational database into transactional messages using Kafka Streams.
Change data capture (CDC) converts all the changes that occur inside your database into events and publishes them to an event stream. You can then use these events to power analytics, drive operational use cases, hydrate databases, and more. The pattern is enjoying wider adoption than ever before.