Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Introducing Versioned State Store in Kafka Streams

Written By

Since the introduction of stream processing, there have been three certainties in life: death, taxes, and out-of-order data. As a stream processing library built for Apache Kafka, Kafka Streams processes data in offset order. When out-of-order data is present, offset order differs from timestamp order and care must be taken to ensure that processing results respect timestamp order where appropriate. The introduction of Versioned State Stores to Kafka Streams in the Apache Kafka 3.5 release is a huge milestone in this direction.

In this blog post, I'll address the what, why, and how of Versioned Stores in Kafka Streams, including what they are, why you might like to use them, how to get started, and a couple of things to watch out for when upgrading.

Example: Stream-table joins

As a motivating example, consider a simple restaurant use case where a stream of customer orders is joined with a table of menu prices to compute bills for customers. In order to perform the join, Kafka Streams materializes the table of prices as a key-value state store with menu items as keys and their corresponding prices as values. When a new order arrives in the stream, a lookup is performed in the state store to find the price to join with. When prices are updated, the new price replaces the old price in the state store, and subsequent orders are joined against the new price.

So far so good. Now let's add timestamps into the picture — specifically, records with out-of-order timestamps. [dun dun dun]

At time 0 the price of curry is $8, and Anna's order of two curries at time 1 is properly charged as $16. At time 4 the price of curry is updated to $10, and Bob's order at time 5 is properly charged as $10. But Candace's order at time 3 now presents a problem. The state store that materializes the prices table has already been updated to reflect that the price of curry is $10. Even though the price update occurred at time 4 after Candace's order was placed, the previous price information is no longer present in the state store and therefore unavailable to the join. As a next-best option, Kafka Streams joins the stream-side record with table record in the state store anyway, and Candace is charged $30 instead of $24. 

The difference may only be $6 but we risk losing Candace as a customer forever and receiving a one-star review. For other business logic applications using Kafka Streams, the consequences could be even more catastrophic. To patch this potentially problematic behavior, we need the state store which materializes the prices table to keep both prices for the key curry — that the price was originally $8, and then updated to $10 at time 4.

Versioned State Stores and timestamped lookups

Versioned key-value state stores do just that: store multiple record versions per key. Each record version has an associated value and timestamp. In addition to the usual get(key) interface of key-value stores which returns the latest value for the given key, versioned key-value stores additionally support a method for timestamped lookups — get(key, asOfTimestamp) — which returns the active record version for the specified timestamp.

When there are multiple record versions per key, each record version has an associated validFrom timestamp and a validTo timestamp. The validFrom timestamp is the record timestamp, while the validTo timestamp is the timestamp of the next record version for the key, or undefined/infinity if no such next record exists. Together these define the validity interval of the record version and allow us to unambiguously decide which record is active at any particular timestamp. This is the record returned by get(key, asOfTimestamp).

Because computer memory is finite (sadly!), each versioned store has an associated history retention which specifies how far back in time, relative to the current observed stream time, timestamped lookups may be performed. 

In the example diagram above, history retention is 30 time units and stream time has advanced to time 63. It's valid to perform timestamped lookups at time 60, 50, and even 33, as these timestamps are all at least stream time - history retention = 63 - 30 = 33. Note that the timestamped lookup for time 33 returns a record version with timestamp (t=17) outside the history retention bound, which is not unexpected. History retention provides a guarantee on how old the timestamp for a timestamped lookup may be, and the versioned store will ensure that older record versions are kept long enough to fulfill this guarantee.

In contrast, the timestamped lookup for time 30 returns null because time 30 is older than stream time — history retention, and is therefore no longer covered by the history retention guarantee. In fact, if a lookup is requested with an ineligible timestamp bound, then versioned stores will predictably return null unless the latest record version for the key (i.e., the one returned from get(key)) satisfies the timestamp bound, in which case that same record will be returned by the timestamped lookup as well.

History retention also serves as a grace period for updates to the store. If a write request is made for a timestamp older than the history retention bound, then the write will be rejected.

Stream-table joins revisited

Returning to the restaurant application from earlier, here's how things change if the state store that materializes the prices table is versioned.

When a stream is joined with a versioned table, Kafka Streams will perform timestamped lookups into the table store using stream record timestamps. When Candace's out-of-order order arrives, the price record to join with is found via get(key=curry, asOfTimestamp=3), which returns the record version with value $8, and Candace is appropriately only charged for $24 instead of $30.

Latest-by-timestamp semantics

In addition to providing support for timestamped lookups, versioned state stores also allow applications to use latest-by-timestamp semantics rather than latest-by-offset semantics. This results in improved timestamp handling for table aggregations and table-table joins in the presence of out-of-order data.

Table aggregations

When aggregating over a versioned table, the aggregation result will reflect the latest-by-timestamp records, rather than the latest-by-offset records. To see what this means, suppose our restaurant has opened a poll for customers to vote for a new menu item, and uses Kafka Streams to determine the number of votes each proposed menu item has received in real time.

The aggregate processor has a state store that tracks the aggregation result for each key. If a new customer casts a vote for the mint pizza, then the aggregate processor will look up the current number of votes for mint pizza from the state store, increment the result, and write the new total back to the state store.

Additionally, the upstream table being aggregated must also be materialized with a state store. If a customer who has already voted changes their vote from the mint pizza to the berry sandwich, then the aggregate processor needs to not only increment the vote tally for berry sandwich but also needs to decrement the tally for mint pizza. In order to efficiently determine that this customer's previous vote was for the mint pizza, that's why the votes table must be materialized as well.

So everything looks great so far, but what if we add an out-of-order record? 

Anna voted for ice cream noodles at time 3 before she changed her vote to the berry sandwich at time 4, but her ice cream noodles vote was received later at the system. (Maybe she was voting from multiple devices at once, or there was a sporadic network delay.) In the world without versioned stores, Kafka Streams processes this out-of-order update in the same way as any other update: the vote tally for ice cream noodles will be incremented, and the tally for the berry sandwich will be decremented. (The new aggregation result reflects the highest timestamp seen so far, t=4 in this case.) Because the ice cream noodles record has the larger Kafka topic offset, this is the record included in the aggregation result, regardless of the fact that there was an earlier record for the same key with a later timestamp.

If the votes table were materialized with a versioned store instead, then the table aggregate processor switches from latest-by-offset semantics to latest-by-timestamp semantics. Rather than including Anna's out-of-order ice cream noodles vote in the aggregated vote counts, the aggregate processor identifies that the record is out-of-order and does not update the aggregation result. As a result, the record included in the final aggregation result is the record with the latest timestamp (per key) regardless of whether input records are out-of-order.

Table-Table Joins

The analogous effect applies to table-table joins: when joining versioned tables, the latest join result will reflect the latest-by-timestamp records from each of the source tables, per key, rather than the latest-by-offset records. Again, this distinction only comes into play when there is out-of-order data.

Continuing the example from above, our restaurant suspects regional voting preferences in the poll for a new menu, and is set on having real-time insights as voting progresses. To aid in analysis, we perform a table-table join to join votes with customer regions to create an enriched view of votes with location data.

For table-table joins, new records arriving at either table may trigger a new join result, and therefore both source tables are materialized for efficient lookups as part of executing the join. Similar to the table aggregation above, out-of-order updates for an existing key do not trigger new join results. This ensures that the latest join result is the join of the latest-by-timestamp records from each table, even if there is out-of-order data.

get(key) 

Consistent with versioned tables switching from latest-by-offset to latest-by-timestamp semantics, the get(key) method for retrieving the latest value for the given key also returns the latest-by-timestamp record for versioned stores, in contrast to the latest-by-offset record returned from unversioned stores.

So what counts as a "versioned table," anyway?

We've explored how versioned tables result in the use of timestamped lookups in stream-table joins, and latest-by-timestamp semantics for table aggregations and table-table joins. A missing detail yet to be discussed is what qualifies as a "versioned table." From the perspective of a join, aggregate, or any other processor for that matter, how do we decide whether an input table is versioned, and should get the updated semantics?

If a source table is materialized with a versioned store, that should of course count as a versioned table. If a source table is materialized with an unversioned store, that is an unversioned table. If a table is not materialized at all, even implicitly, then that's also unversioned.

What if a source table is materialized with a versioned store, and there are intermediate table transformations prior to the processor in question? If the intermediate transformation is a mapValues, transformValues, or filter, we would not expect these stateless operations to change whether a table is considered versioned or not, and therefore the table that started as a versioned table is still a versioned table after the transformation.

This does not apply, however, if the result of the intermediate stateless transformation is explicitly materialized with an unversioned store. In this case, the table is unversioned from the perspective of downstream processors. This applies even if the materialization does not explicitly specify a store supplier for an unversioned store, since state stores are unversioned by default. For example, even if a materialization only specifies a state store name, without passing a store supplier, the resulting state store will be unversioned, and therefore the table being materialized is unversioned as well.

On the other hand, if the result of an intermediate transformation is explicitly materialized with a versioned store, then the table is versioned from the perspective of downstream processors.

If intermediate transformations involve stateful operations such as aggregations or joins, then the table is considered unversioned from the perspective of downstream processors, even if the source table prior to the intermediate stateful transformation is versioned. This is because certain stateful operations such as aggregations always materialize their result, and state stores are unversioned by default so the result of the table aggregation is an unversioned table by default. For simplicity, the result of any intermediate stateful transformation is unversioned, even if some such operations do not necessarily materialize their result.

If user application code requests that the result of the intermediate stateful transformation is materialized with a versioned store, then the table is versioned. That said, caution should be exercised when materializing the result of a stateful transformation as a versioned table since the version history produced by the corresponding processor may be incomplete.

Finally, converting a versioned table to a stream via KTable#toStream() and then back to a table via KStream#toTable() will not result in a versioned store, unless the table is once again materialized with a versioned store after the toStream() call. 

In summary, a table is considered versioned from the perspective of a downstream processor if the table is materialized upstream as a versioned store, and none of the following occur between the materialization as a versioned store and the downstream processor:

  • An intermediate materialization with an unversioned store supplier or no explicit store supplier

  • An intermediate stateful transformation

  • A call to KTable#toStream() followed by KStream#toTable()

Try it out

Versioned State Stores are available in Kafka Streams starting with Apache Kafka 3.5, and are opt-in only with this release. If an existing user upgrades to 3.5 without making any application code changes, then everything will run as before, continuing to use unversioned stores and the previous timestamp semantics. 

For DSL users who wish to opt in, getting started with versioned stores is as simple as passing the new versioned store supplier, Stores#persistentVersionedKeyValueStore(), via a Materialized instance anywhere existing interfaces accept Materialized instances — during table creation, or as part of a table transformation in order to materialize the transformation result.

streamsBuilder
    .table(
        topicName,
        Materialized.as(
            Stores.persistentVersionedKeyValueStore(
                STORE_NAME, 
                Duration.ofMillis(HISTORY_RETENTION)
            )
        )
    );

A history retention value must be provided when calling the persistentVersionedKeyValueStore() versioned store supplier.

For users of the Processor API, a new Stores#versionedKeyValueStoreBuilder() method is additionally provided so that the created state store is properly typed as a VersionedKeyValueStore.

streamsBuilder
    .addStateStore(
        Stores.versionedKeyValueStoreBuilder(
            Stores.persistentVersionedKeyValueStore(
                STORE_NAME, 
                Duration.ofMillis(HISTORY_RETENTION)),
            Serdes.Integer(),
            Serdes.String()
        )
    );
Note:
Kafka Streams 3.5 does not yet provide an in-memory implementation for versioned key-value stores.

Upgrading from an un-versioned store to a versioned store

The versioned store implementation provided by Stores#persistentVersionedKeyValueStore() shares the same changelog topic format as the unversioned store implementations provided by Stores#persistentKeyValueStore() and Stores#persistentTimestampedKeyValueStore(). The only difference between the changelog topics of versioned and unversioned key-value stores is that a versioned store changelog topic additionally has the topic config property min.compaction.lag.ms set to the store's history retention plus some buffer, in order to prevent compaction of older record history before history retention has elapsed.

This means it is possible to upgrade an unversioned store in an existing application to a versioned store with the following procedure:

  1. Stop the application.

  2. Delete all local state (for the store being updated) from all application instances.

  3. Update the changelog topic configuration by setting min.compaction.lag.ms to a value suitable for the desired history retention, e.g., history retention plus some buffer to account for broker wall clock time usage in topic cleanup.

  4. Update the application code to use a versioned store.

  5. Restart the app.

An analogous procedure may be used to downgrade from a versioned store to an unversioned store, or to update the history retention of an existing versioned store. In all cases, local application state should be deleted so that application instances may restore from the changelog. This is important as the way that the different stores format bytes within RocksDB are not the same.

As of Kafka Streams 3.5, versioned stores do not yet support interactive queries. For state stores with interactive query use cases, users may wish to wait until a future release to upgrade.

Performance, and a brief primer on implementation

The lack of interactive query support is a gap that will be closed shortly (KIP-960, KIP-968, KIP-969). Besides that, state store performance is the main consideration to be aware of when upgrading from unversioned to versioned stores. Storing multiple record versions per key is unsurprisingly less performant than storing only a single record per key. To understand how to think about versioned store performance, let's take a brief look at the underlying RocksDB-based implementation.

Similar to the implementation of window stores in Kafka Streams, time is broken up into fixed size segments. Records are stored in segments and entire segments of old records are dropped at a time for efficient expiry of old data. Unlike window stores, record versions for a versioned store are stored in segments based on their validTo timestamps, rather than (validFrom) record timestamps. This ensures that an old record version will not be expired until its validity range has entirely fallen out of the store's history retention bound. Within a segment, all record versions for the same key are stored together as a single RocksDB record. 

Because the latest record version for each key has an undefined/infinite validTo timestamp, the latest record version for each key is stored in a separate "latest value store" rather than a “regular” segment. 

When performing a get(key) lookup for the latest record version associated with a given key, the implementation only needs to check the latest value store. This is a single RocksDB lookup, the same as for get(key) on an unversioned store, so we expect performance to be comparable.

When performing a timestamped lookup, the versioned store implementation first checks the latest value store, and returns immediately if the record version found satisfies the timestamp bound in the query. If not, then the newest segment is checked, and then progressively older segments as needed. This means that the performance of a get(key, asOfTimestamp) query which finds an eligible record in the latest value store is the same as the performance of a get(key) query, but otherwise is worse as additional RocksDB lookups are required to check the segments. Querying for older timestamps likely results in worse performance than querying for newer timestamps, if more segments need to be checked for the query.

Inserting a new record version follows the same sequence. First the latest value store is checked for the key being inserted. If a record version is present and the new record version has a later timestamp, then we know the new record version belongs in the latest value store. For this case, two writes are required: one to update the latest value store and another to move the record version previously in the latest value store into the appropriate segment store. This is twice the number of writes compared to for an unversioned store, so performance differences are not unexpected. 

If the record version in the latest value store has a later timestamp than the record being inserted, then segments must be checked, which requires additional reads from RocksDB. Heuristically, in-order writes and newer out-of-order writes will incur less overhead than older out-of-order writes. 

Segment size also plays an important factor in versioned store performance. A smaller segment size means there will be more segments, which may require more reads from RocksDB to serve old timestamped lookups or out-of-order writes. On the other hand, a larger segment size means more record versions may be collected together into the same segment, which means a larger data size is read from and written to RocksDB when updating segments, and this may also hurt performance. The optimal segment size varies depending on workload characteristics, and may be tuned by users as an optional argument to Stores#persistentVersionedKeyValueStore() when creating a versioned store.

Versioned store performance characteristics are heavily workload dependent, so benchmarking with specific user workloads is advised.

Learn more

KIP-889 introduces the VersionedKeyValueStore interface and RocksDB-based versioned store implementation, while KIP-914 covers how versioned stores affect DSL processor semantics. Follow-up work is on the horizon, including an additional improvement to stream-table join semantics (KIP-923), and interactive query support for versioned stores (KIP-960, KIP-968, KIP-969).

If you have questions, are looking for help, or would like to share your projects with others in the Apache Kafka community, join Confluent’s community Slack and Forum.

Related Content

  • Victoria joined Confluent as an engineer in 2018 and has been working on ksqlDB and Kafka Streams ever since. Having built an initial interest from working on ksqlDB, she is excited to explore more and contribute larger projects to Kafka Streams.

Did you like this blog post? Share it now