One of the most highly requested enhancements to ksqlDB is here! Apache Kafka® messages may contain data in message keys as well as message values. Until now, ksqlDB could only read limited kinds of data from the key position. ksqlDB’s latest release—ksqlDB 0.15—adds support for many more types of data in messages keys, including message keys with multiple columns. Users of Confluent Cloud ksqlDB already have access to these new features as Confluent Cloud always runs the latest release of ksqlDB.
ksqlDB 0.15 supports message keys:
CREATE STREAM my_stream (my_key BIGINT KEY, v1 STRING, v2 INT)
WITH (KAFKA_TOPIC='my_topic', KEY_FORMAT='PROTOBUF', VALUE_FORMAT='JSON');
CREATE STREAM my_other_stream (
my_key STRUCT<f1 INT, f2 STRING> KEY, v1 STRING, v2 INT
) WITH (KAFKA_TOPIC='my_topic', FORMAT='JSON');
CREATE STREAM my_other_stream (
k1 STRING KEY, k2 BOOLEAN KEY, v1 STRING, v2 INT
) WITH (KAFKA_TOPIC='my_topic', FORMAT='JSON');
Additionally, ksqlDB now also supports PARTITION BY and GROUP BY multiple partitioning or grouping expressions, resulting in tables and streams with multiple key or primary key columns, respectively.
CREATE STREAM my_repartitioned_stream AS
SELECT my_key->f1 AS k1, v1 AS k2, v2
FROM my_other_stream
PARTITION BY my_key->f1, v1
EMIT CHANGES;
CREATE TABLE my_aggregate AS
SELECT k1, k2, COUNT(*) AS cnt
FROM my_repartitioned_stream
GROUP BY k1, k2
EMIT CHANGES;
Let’s dive into each of these enhancements in turn.
All value serialization formats supported by ksqlDB may now be used as key formats as well. ksqlDB users are familiar with specifying a value format when creating streams and tables:
CREATE STREAM my_stream (...) WITH (KAFKA_TOPIC='my_topic', VALUE_FORMAT='JSON');
To specify a key format, use the new KEY_FORMAT property:
CREATE STREAM my_stream (...) WITH (KAFKA_TOPIC='my_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');
If your key and value formats are the same, the FORMAT property can be used in lieu of specifying the two separately:
CREATE STREAM my_stream (...) WITH (KAFKA_TOPIC='my_topic', FORMAT='JSON');
If either the key format or value format is unspecified, then the default formats will be used. Default key and value formats are controlled by the ksql.persistence.default.format.key and ksql.persistence.default.format.value configs, respectively. In a ksqlDB server configuration file, this could be the following:
ksql.persistence.default.format.key=KAFKA ksql.persistence.default.format.value=AVRO
Or, equivalently in a Docker Compose file:
ksqldb-server: image: confluentinc/ksqldb-server:0.15.0 ... environment: KSQL_LISTENERS: http://0.0.0.0:8088 KSQL_BOOTSTRAP_SERVERS: broker:9092 ... KSQL_KSQL_PERSISTENCE_DEFAULT_FORMAT_KEY: KAFKA KSQL_KSQL_PERSISTENCE_DEFAULT_FORMAT_VALUE: AVRO
By default, ksql.persistence.default.format.key is set to KAFKA, so statements that do not specify an explicit key format continue to function as in older ksqlDB versions. The ksql.persistence.default.format.value config has no default. Unless it is set, CREATE STREAM and CREATE TABLE statements that do not specify an explicit value format will be rejected.
For additional details on these new key formats, including how to use schema inference to infer key schemas, see the section covering implications of the new key formats on schema inference and joins.
In addition to expanding the set of supported key serialization formats, ksqlDB 0.15 also adds support for additional data types for message keys. The ARRAY and STRUCT data types are now accepted as valid key column data types, as well as nested combinations of such:
CREATE STREAM my_stream (K STRUCT<array_field ARRAY<INTEGER>, other_field STRING> KEY, V BIGINT) WITH (KAFKA_TOPIC='my_topic', FORMAT='JSON');
Note that nested types including ARRAY and STRUCT are not supported by certain serialization formats, such as KAFKA and DELIMITED.
ksqlDB does not support MAP type keys as maps may experience inconsistent serialization, which could lead to unexpected behavior if logically equivalent keys are not sent to the same topic partition and are processed by different processors as a result. For the same reason, nested types containing maps are also not supported as key column data types.
For all formats other than PROTOBUF, ksqlDB expects that if a schema is declared with a single key column, then that key column is unwrapped, which means the contents are not contained in an outer record or object. For example, a JSON integer is unwrapped while a JSON object containing a single integer field is wrapped.
As such, a JSON stream declared with the schema (K INTEGER KEY, V STRING) will fail to deserialize keys such as the following:
{ "K" : 123 }
In order to properly use such message keys with ksqlDB, instead declare the key schema as a STRUCT to indicate that the key field is wrapped:
CREATE STREAM my_stream (MY_KEY STRUCT<K INTEGER> KEY, V STRING) WITH (KAFKA_TOPIC='my_topic', FORMAT='JSON');
Wrapped keys with multiple fields may also be represented as multiple key columns. For example, if your message keys look like this:
{ "K1" : "foo", "K2" : 42 }
Then you have two options for how to represent your keys in ksqlDB. The following are both valid:
(MY_KEY STRUCT<K1 STRING, K2 INTEGER> KEY, [... value columns ...])(K1 STRING KEY, K2 INTEGER KEY, [... value columns ...])
These two are equivalent, though their downstream usage looks different syntactically. Here’s an example of copying the first key column into the message value (without changing the message key), using the first key representation:
CREATE STREAM stream_copy AS SELECT MY_KEY, AS_VALUE(MY_KEY->K1) FROM my_stream EMIT CHANGES;
And here’s the equivalent query using the second key representation:
CREATE STREAM stream_copy AS SELECT K1, K2, AS_VALUE(K1) FROM my_stream EMIT CHANGES;
When ksqlDB performs schema inference to auto-populate key columns from the latest schema in Confluent Schema Registry (see Integration with Confluent Schema Registry below), ksqlDB uses the first of the two equivalent representations (with key column name ROWKEY). If you prefer the latter representation instead, you can specify the key schema explicitly rather than rely on schema inference.
Note that the KAFKA serialization format does not support multiple key columns, as the format supports neither the STRUCT type nor serializing multiple columns.
ksqlDB now supports PARTITION BY clauses with multiple partitioning expressions:
CREATE STREAM repartitioned_stream AS SELECT * FROM my_stream PARTITION BY <expression 1>, <expression 2> EMIT CHANGES;
This query results in two key columns, where the first corresponds to <expression 1> and the second corresponds to <expression 2>. The name of each key column is determined as follows:
Because the KAFKA serialization format does not support multiple columns, PARTITION BY multiple expressions is not supported for the KAFKA key format. If desired, you can specify a different key format as part of your query:
CREATE STREAM repartitioned_stream WITH (KEY_FORMAT='JSON') AS SELECT * FROM my_stream PARTITION BY <expression 1>, <expression 2> EMIT CHANGES;
With the introduction of support for multiple key columns in ksqlDB 0.15, queries with GROUP BY clauses that contain multiple grouping expressions now result in multiple primary key columns, rather than a single primary key column that is the string concatenation of the different expressions.
Concretely, consider the following query:
CREATE TABLE my_aggregate AS SELECT K1, K2, COUNT(*) AS CNT FROM my_stream GROUP BY K1, K2 EMIT CHANGES;
Prior to ksqlDB 0.15, the GROUP BY clause with multiple grouping expressions resulted in a single key column with a system generated name, KSQL_COL_0, and type STRING.
As of ksqlDB 0.15, the equivalent query instead results in a table with two primary key columns, with names K1 and K2, and data types corresponding to the original K1 and K2 columns. More generally, the number of resulting primary key columns matches the number of grouping expressions, and the data type of each primary key column is as specified by the grouping expression. The rules for determining the names of the resulting primary key columns are the same as those for PARTITION BY described above.
Though the behavior of GROUP BY on multiple expressions has changed in ksqlDB 0.15, persistent queries issued prior to ksqlDB 0.15 will continue to run with the old behavior. In other words, in-place upgrades from ksqlDB 0.14 to 0.15 are supported. If you are a Confluent Cloud customer using ksqlDB, you can start using the new features right away as Confluent Cloud ksqlDB clusters have already been upgraded. Newly issued queries will have the new behavior.
Additionally, because the KAFKA serialization format does not support multiple columns, GROUP BY multiple expressions is no longer supported for the KAFKA key format. Existing persistent queries will continue to run undisrupted, but new queries of this type will be rejected.
If desired, you can specify a different key format as part of your query:
CREATE TABLE my_aggregate WITH (KEY_FORMAT='JSON') AS SELECT K1, K2, COUNT(*) AS CNT FROM my_stream GROUP BY K1, K2 EMIT CHANGES;
Or, you can replicate the old behavior by manually creating a single grouping expression from the individual expressions, and aliasing the resulting key column to have the old system generated name:
CREATE TABLE my_aggregate AS SELECT CAST(K1 AS STRING) + '|+|' + CAST(K2 AS STRING) AS KSQL_COL_0, COUNT(*) AS CNT FROM my_stream GROUP BY CAST(K1 AS STRING) + '|+|' + CAST(K2 AS STRING) EMIT CHANGES;
To issue pull queries against tables created as the result of an aggregation with multiple expressions in the GROUP BY clause, you can use a conjunction of equality expressions to cover the different primary key columns. Given the table below:
CREATE TABLE my_aggregate AS SELECT K1, K2, COUNT(*) AS CNT FROM my_stream GROUP BY K1, K2 EMIT CHANGES;
An example pull query is as follows:
SELECT * FROM my_aggregate WHERE K1='foo' AND K2='bar';
As of today, values must be specified for all primary key columns as part of the pull query.
ksqlDB 0.15 does not add support for JOIN on sources with multiple key columns. If desired, you can work around this limitation by creating a single STRUCT-type key column for your sources instead of declaring multiple key columns.
For source streams and tables, this means declaring the key schema as:
(MY_KEY STRUCT<K1 STRING, K2 INTEGER> KEY, [... value columns ...])
Rather than:
(K1 STRING KEY, K2 INTEGER KEY, [... value columns ...]).
For PARTITION BY multiple expressions, this means the following:
CREATE STREAM repartitioned_stream AS SELECT * FROM my_stream PARTITION BY STRUCT(K1 := <expression 1>, K2 := <expression 2>) EMIT CHANGES;
Rather than:
CREATE STREAM repartitioned_stream AS SELECT * FROM my_stream PARTITION BY <expression 1>, <expression 2> EMIT CHANGES;
Finally, for GROUP BY multiple expressions, this means:
CREATE TABLE my_aggregate AS SELECT STRUCT(K1 := <expression 1>, K2 := <expression 2>) AS K, COUNT(*) AS CNT FROM my_stream GROUP BY STRUCT(K1 := <expression 1>, K2 := <expression 2>) EMIT CHANGES;
Instead of:
CREATE TABLE my_aggregate AS SELECT <expression 1> AS K1, <expression 2> AS K2, COUNT(*) AS CNT FROM my_stream GROUP BY <expression 1>, <expression 2> EMIT CHANGES;
This section covers additional details of the newly supported key serialization formats, including how you can use schema inference with keys and how ksqlDB joins sources with different key formats.
The AVRO, PROTOBUF and JSON_SR ksqlDB serialization formats require your ksqlDB cluster to be integrated with Confluent Schema Registry. For users of Confluent Cloud, this integration happens automatically when you stand up a new ksqlDB cluster, as long as you have enabled Schema Registry in your environment.
ℹ️ | Note that JSON and JSON_SR are distinct: The JSON format is not integrated with Schema Registry, while JSON_SR benefits from schema management and ksqlDB schema inference due to its integration with Schema Registry. See the documentation for more. |
For self-managed ksqlDB users, integrating ksqlDB with Schema Registry is as simple as configuring the address of a Schema Registry cluster in the ksqlDB server properties via the ksql.schema.registry.url config. Once configured, ksqlDB can infer column names and types of messages in Kafka topics so that you no longer need to specify schemas explicitly in CREATE STREAM and CREATE TABLE statements. This is called schema inference.
Assuming that the following Avro schema is registered under the Schema Registry subject my_topic-key:
"int"
And the following Avro schema is registered under the subject my_topic-value:
{ "type": "record", "name": "MyValue", "fields": [ { "name": "F1", "type": ["null", "string"] }, { "name": "F2", "type": ["null", "double"] } ] }
Then executing the following ksqlDB statement:
CREATE STREAM my_avro_stream WITH (KAFKA_TOPIC='my_topic', FORMAT='AVRO');
Will result in the new stream having the inferred schema (ROWKEY INT KEY, F1 STRING, F2 DOUBLE).
If no key columns are desired, the KEY_FORMAT may be set to the special value NONE to indicate that no key schema inference should be attempted. The following statement will infer value column names and types from Schema Registry but not key columns. This is called partial schema inference.
CREATE STREAM my_avro_valued_stream WITH (KAFKA_TOPIC='my_topic', KEY_FORMAT='NONE', VALUE_FORMAT='AVRO');
Partial schema inference also works in the other direction. Here’s an example that infers the key schema but not the value schema, as the value’s JSON format does not support schema inference:
CREATE STREAM my_json_valued_stream (V1 BIGINT, V2 ARRAY<STRING>) WITH (KAFKA_TOPIC='my_topic', KEY_FORMAT='AVRO', VALUE_FORMAT='JSON');
Assuming the same schema registered under the subject my_topic-key as above, the newly created stream will have the schema (ROWKEY INT KEY, V1 BIGINT, V2 ARRAY<STRING>).
When joining two sources (i.e., streams or tables) of data, the two sources must be co-partitioned. This requires that the keys of the two sources be serialized in the same way, which in turn requires that they have the same data types and formats. Similar to how ksqlDB will automatically repartition streams to enable joins, if joining on an expression that is not the stream’s key, ksqlDB will also automatically repartition sources to enable joins where the key formats of the two sources do not match.
For example, if source A has key format KAFKA and source B has key format AVRO, and you issue a statement that requires joining the two sources, then ksqlDB will automatically repartition one of the two sources to enable the join. Note that this is true even if both A and B are tables. ksqlDB does not support repartitioning tables except when the only change is the key format. This is because Apache Kafka only guarantees order within each topic partition, so unless there is a one-to-one correspondence between the new key (post-repartition) and the old key (pre-repartition), then repartitioning a table could result in table updates being reordered.
This is not a concern when the only difference between the old and new keys is the serialization format; it is safe to repartition in this case, and ksqlDB will do so automatically as a convenience for users.
The first phase of expanded support for message keys in ksqlDB is complete, but there’s still plenty more to come:
In addition to the incremental improvements above, the team is also exploring a number of larger changes to ksqlDB semantics:
Ready to check ksqlDB out? Head over to ksqldb.io to get started, where you can follow the quick start, read the docs, and learn more! For more on what’s new in ksqlDB 0.15, see the changelog or blog post.
At a high level, bad data is data that doesn’t conform to what is expected, and it can cause serious issues and outages for all downstream data users. This blog looks at how bad data may come to be, and how we can deal with it when it comes to event streams.
Versioned key-value state stores, introduced to Kafka Streams in 3.5, enhance stateful processing capabilities by allowing users to store multiple record versions per key, rather than only the single latest version per key as is the case for existing key-value stores today...