[Demo+Webinar] New Product Updates to Make Serverless Flink a Developer’s Best Friend | Watch Now

Announcing ksqlDB 0.10.0

Written By

We’re excited to announce the release of ksqlDB 0.10.0, available now in the standalone distribution and on Confluent Cloud! This version includes a first-class Java client, improved Apache Kafka® key support, and a slew of new built-in functions. We’ll highlight some of the major changes, but see the changelog for a detailed list of all improvements and bug fixes.

Java client

One of the best things about ksqlDB is that it allows you to use language-neutral SQL to author stream processing programs. But what about when you need to connect your processed data back to your application? In the past, ksqlDB’s answer to this was its REST API. Although this is useful, there was room for something better. What everyone really wants is a first-class client to interact with ksqlDB from their favorite programming language.

In this release, we’re excited to announce the Java client, the first in a series of new clients that we will be rolling out. Our client supports builds on a huge amount of work that we’ve been doing over the past six months to rework how our networked server API behaves. Today’s Java client currently supports pull and push queries as well as insertion of new rows of data into existing ksqlDB streams. In an upcoming release, the client will be enhanced to additionally support persistent queries and admin operations such as listing streams, tables, and topics.

Let’s see it in action.

Push and pull query support

ksqlDB supports querying your data in two different ways: push and pull. Push queries allow your application to issue a query and subscribe to the results as they change in real time. Pull queries allow your client to fetch the current state of a materialized view at a point in time.

The client supports both consuming query result rows in an event streaming fashion via Reactive Streams as well as polling for result rows in a synchronous manner.

To consume result rows in a streaming fashion, use the streamQuery() method and subscribe a Reactive Streams Subscriber to the result.

client.streamQuery("SELECT * FROM MY_STREAM EMIT CHANGES;")
    .thenAccept(streamedQueryResult -> {
      System.out.println("Query has started. Query ID: " + streamedQueryResult.queryID());
RowSubscriber subscriber = new RowSubscriber(); streamedQueryResult.subscribe(subscriber); }).exceptionally(e -> { System.out.println("Request failed: " + e); return null; });

In the code snippet above, RowSubscriber is an example implementation of a Reactive Streams Subscriber for consuming query result rows.

The streamQuery() method may also be used to synchronously poll for results one at a time:

StreamedQueryResult streamedQueryResult = client.streamQuery("SELECT * FROM MY_STREAM EMIT CHANGES;").get();

for (int i = 0; i < 10; i++) { // Block until a new row is available Row row = streamedQueryResult.poll(); if (row != null) { System.out.println("Received a row!"); System.out.println("Row: " + row.values()); } else { System.out.println("Query has ended."); } }

Rather than blocking for each result row one row at a time, the executeQuery() method can be used to instead block until all result rows have arrived. This method is useful for pull queries and push queries with limits.

String pullQuery = "SELECT * FROM MY_MATERIALIZED_TABLE WHERE KEY_FIELD='some_key';";
BatchedQueryResult batchedQueryResult = client.executeQuery(pullQuery);

// Wait for query result List resultRows = batchedQueryResult.get();

System.out.println("Received results. Num rows: " + resultRows.size()); for (Row row : resultRows) { System.out.println("Row: " + row.values()); }

Inserting new rows

Similar to using a basic Kafka client to produce data or using the INSERT INTO ... VALUES statement, the client also supports inserting rows into existing streams. Here’s an example of using the client to insert a new row into an existing stream ORDERS with the schema (ORDER_ID BIGINT, PRODUCT_ID VARCHAR, USER_ID VARCHAR):

KsqlObject row = new KsqlObject()
    .put("ROWKEY", "k1")
    .put("ORDER_ID", 12345678L)
    .put("PRODUCT_ID", "UAC-222-19234")
    .put("USER_ID", "User_321");

client.insertInto("ORDERS", row).get();

Get started with the Java client today!

Enhanced key support

ksqlDB tightly integrates with Kafka to make it easy to work with your existing events. But in the past, ksqlDB had strict limitations on the key namespace of Kafka’s records. This wasn’t ideal because people don’t simply use keys for partitioning; they also store important data in them.

Our goal is to make it simple for you to process all of your Kafka data with ksqlDB. To enable that, we’re pleased to announce a sweeping set of new features. Note that some of the changes are not backward compatible with previous versions, so you may need to update some of your statements.

Any key name

ksqlDB 0.8.0 saw the introduction of non-string key support. This release builds on that progress by removing the vestigial restriction that all key columns must be named ROWKEY: key columns can now have any name, just like any other column. This is important because it allows you to work with all of your Kafka data in one logical, column-oriented model.

In addition to being able to explicitly provide your own name for key columns when declaring a stream or table, the name of the key column in derived streams and tables is now itself derived from the query. For example, a query with a GROUP BY on column_1 will have a key column matching the type and name of the grouping column. More details can be found in KLIP-24.

Removal of WITH KEY syntax

Previous versions allowed you to provide the name of a value column that was a copy of the ROWKEY key column. This information acted as an optimization hint and allowed queries to use more friendly column names. The introduction of the “any key name” feature renders this syntax redundant. Version 0.10.0 drops support for this syntax.

Primary keys and keyless streams

Previously, ksqlDB would add an implicit key column called ROWKEY to any stream or table that did not explicitly declare a key column, leaving users to wonder where the column came from. ksqlDB 0.10.0 now requires all tables to declare a primary key column and creates a stream without a key column should the stream not declare one. More details can be found in KLIP-29.

New built-in functions

We’ve introduced a plethora of new built-in functions to make it easier to write your stream processing applications. Here is a quick tour of the new additions:

Special shoutout to mateuszmrozewski for contributing the INSTR UDF and hpgrahsl for contributing the ARRAY_JOIN UDF.

Get started today

For the full list of fixes and improvements, see the changelog.

If you haven’t already, join our #ksqldb Confluent Community Slack channel, follow us on Twitter, and get started with ksqlDB today!

  • Steven Zhang is a senior software engineer in the Stream Processing and Analytics organization and is shaping the groundwork for Confluent’s upcoming Flink integration.

Did you like this blog post? Share it now