[Webinar] Deliver enterprise-grade Apache Kafka® to your customers | Join Now
Apache Kafka ships with Kafka Streams, a powerful yet lightweight client library for Java and Scala to implement highly scalable and elastic applications and microservices that process and analyze data stored in Kafka. A Kafka Streams application can perform stateless operations like maps and filters as well as stateful operations like windowed joins and aggregations on incoming data records.
For stateful operations, Kafka Streams uses local state stores that are made fault-tolerant by associated changelog topics stored in Kafka. For these state stores, Kafka Streams uses RocksDB as its default storage to maintain local state on a computing node (think: a container that runs one instance of your distributed application). RocksDB is a highly adaptable, embeddable, and persistent key-value store that was originally built by the Engineering team at Facebook. Many companies use RocksDB in their infrastructure to get high performance to serve data. Kafka Streams configures RocksDB to deliver a write-optimized state store.
This blog post will cover key concepts that show how Kafka Streams uses RocksDB to maintain its state and how you can tune RocksDB for Kafka Streams’ state stores. We’ll first explain the basics around Kafka Streams and how it uses state stores. Then, we‘ll provide an overview about RocksDB, including the two most used compaction styles, level compaction and universal compaction. Once we understand the foundational principles, we’ll deep dive into operational issues that you may encounter when you operate your Kafka Streams application with RocksDB state stores, and most importantly how you can tune RocksDB to overcome those issues.
Kafka Streams defines its computational logic through a so-called topology. A topology consists of processors connected by streams. A processor executes its logic on a stream record by record.
Processors can be stateless or stateful. Stateless processors process records independently of any other data. For example, a processor that implements a map operation (such as masking all but the last four digits of a credit card number) transforms a record into another record without querying any other data. Stateful processors query and maintain a state during the processing of records. For example, an aggregation operation (such as counting the number of input records received in the past five minutes) needs to retrieve the current aggregated value from the state store, update the current aggregated value with the input record, and finally write the new aggregated value to the state store as well as forward the new aggregated value to the downstream processors in the topology. Note that Kafka Streams does not consider processors as stateful if their state is exclusively managed outside of Kafka Streams, that is, when user code within the processor directly calls an external database. While managing state outside of Kafka Streams is possible, we usually recommend managing state inside Kafka Streams to benefit from high performance and processing guarantees.
The topology in the figure above reads records from a Kafka topic and streams the records through a series of stateless and stateful processors. Each processor applies its logic on the input record and forwards an output record to the downstream processors. The last processor in the topology writes its output records to a Kafka topic.
Once a topology is specified, Kafka Streams will execute the topology. Just as a process is an instance of a program that is executed by a computer, a task is an instance of a topology that is executed by Kafka Streams. Kafka Streams creates a task for each partition of the input topic, and each task processes records from its input partition. For example, if the input topic in the topology above has five partitions p0—p4, Kafka Streams will create five tasks t0—t4. Task t0 processes records from partition p0, task t1 processes records from partition p1, and so on. To parallelize the processing, Kafka Streams distributes the five tasks, t0—t4, over all Kafka Streams clients belonging to the same application via the Kafka rebalance protocol.
A stateful processor may use one or more state stores. Each task that contains a stateful processor has exclusive access to the state stores in the processor. That means a topology with two state stores and five input partitions will lead to five tasks, and each task will own two state stores resulting in 10 state stores in total for your Kafka Streams application.
Now that we know how Kafka Streams instantiates state stores, let’s have a look into the internals of them. State stores in Kafka Streams are layered in four ways:
Let’s assume a Kafka Streams application consists of three Kafka Streams clients. While the application executes, one of the Kafka Streams clients crashes. The tasks that the crashed client hosted are redistributed among the two remaining clients. The local states of the crashed client need to be restored on the remaining clients before they can resume processing. However, the remaining clients can’t directly access the local state of the crashed client. Luckily, the changelogging layer sent records to the changelog topic to enable the running clients to restore the local state. This restoration mechanism based on the changelog topic is applied whenever a Kafka Streams client needs to update a local state or needs to create a local state from scratch. In other words, the state’s changelog topic is the single source of truth of a state whereas a state store is a local disposable replica of a partition of the state’s changelog topic that allows you to update and query the state with low latency.
The restoration of a state store is byte based. During restoration, Kafka Streams writes the records from the changelog topic to the local state store without deserializing them. That means the records bypass all layers above the innermost layer during restoration.
The innermost layer of a state store can be any built-in or user-defined state store that implements the state store interface exposed by Kafka Streams. The default state store used in Kafka Streams is RocksDB. Kafka Streams developers initially chose RocksDB because they wanted a write-optimized store. Since RocksDB is the default state store, Kafka Streams provides the means to configure and monitor RocksDB state stores used in a Kafka Streams application.
To configure RocksDB, we need to implement the interface RocksDBConfigSetter and pass the class to the Kafka Streams configuration rocksdb.config.setter. An example for a RocksDB configuration is shown below, where the compaction style of RocksDB is set to level compaction instead of universal compaction that is used by default in Kafka Streams.
public static class MyRocksDBConfig implements RocksDBConfigSetter {
@Override
public void setConfig(final String storeName,
final Options options,
final Map<String, Object> configs) {
options.setCompactionStyle(CompactionStyle.LEVEL);
}
@Override
public void close(final String storeName, final Options options) {}
}
Besides configuring RocksDB, Kafka Streams also exposes RocksDB-specific metrics to monitor the RocksDB state stores used in a Kafka Streams application. KIP-471 and KIP-607 introduced RocksDB-specific metrics. The former leverages the statistics that are collected by RocksDB and the latter the properties that are exposed by RocksDB. These metrics provide invaluable support for diagnosing and resolving possible issues with RocksDB state stores when operating a Kafka Streams application.
Now that we know the basics of Kafka Streams state stores, we’ll learn how RocksDB works internally and how we can tune it.
As you read earlier, the default state store in Kafka Streams is RocksDB. RocksDB is an embeddable key-value persistent store. It is a C++ and Java library that you can embed into your applications. RocksDB is natively designed to give high-end performance for fast storage and server workloads. For example, you can configure RocksDB to provide extremely low query latency on terabytes of data.
Unlike other databases, RocksDB is not a distributed system. It is not highly available and does not have a failover scheme. That doesn’t mean you lose your state store data when you store it in RocksDB using Kafka Streams, because it’s Kafka Streams that makes RocksDB fault tolerant by replicating the state store data to a Kafka topic.
RocksDB is a storage engine library that implements a key-value interface where keys and values are arbitrary bytes. All data is organized in sorted order by the key. RocksDB offers these following operations: Get(key), NewIterator(), Put(key, val), Merge(key, val), Delete(key), and SingleDelete(key). Out of these operations, Kafka Streams specifically calls Get(key), NewIterator(), Put(key, val), and Delete(key).
The operations can be organized into these common categories:
RocksDB uses the log-structured merge architecture to handle high read and write rates on data. In order to debug production applications when you’re running it in a Kafka Streams environment, it helps to understand the architecture of how reads and writes occur. Let’s deep dive into the RocksDB write path.
As a side note for the description below, when a write request comes in, it can go to the transaction log and memtable (step 1a and 1b), or it can just go to the memtable only (step 1a). In RocksDB, a transaction log is stored as a log file in storage. You will typically want to use a transaction log if you care about data loss when a database crashes unexpectedly.
A memtable is an in-memory structure where data is buffered. In RocksDB, the default implementation of the memtable is a skip list. However, RocksDB supports a pluggable API that allows an application to provide its own implementation of a memtable. When a memtable fills up, it is flushed to a static sorted table (SST) file on storage.
When there is a write request, it writes to an active memtable, also called a write buffer.
A write request can also directly write to a transaction log. Periodically, the transaction log gets truncated when the SST files persist the data. It’s important to note that in Kafka Streams, the changelog topics replicate the local state stores in Kafka, and hence they behave like transaction logs. As a result, Kafka Streams is configured not to use the RocksDB transaction log.
When the memtable is full, it becomes a read-only memtable. From there, new writes continue to accumulate in a new memtable.
As new writes accumulate in the new memtable, the read-only memtables are flushed into SST files on the storage system. The data in a SST file is lexicographically sorted to facilitate easy key lookups and sequential scans.
SST files are organized in levels (L0, L1, and so on). Each level has an arbitrary number of SST files. Each SST file has metadata that describes the key range. When you are looking up a particular key, RocksDB checks the metadata to see if a key may exist in a particular file. If it does, it’ll read the file. If it doesn’t, it’ll check the next level. To avoid checking unnecessary files, Bloom filters are used. A Bloom filter is a data structure used to test whether an element is a member of a set. There are Bloom bits (a bit array) that RocksDB keeps for every memtable, and there are also Bloom bits that are stored in every SST file. RocksDB checks Bloom bits first to see if a key may exist in a particular memtable or in a particular SST file. If there is no Bloom match, then we can skip reading the file contents. This is how RocksDB saves on random reads on SST files.
This is where periodic compaction occurs. Compaction is a process that lets you maintain the database in reasonable shape and size so that RocksDB performance is maintained. Later on, we’ll cover compaction in more detail.
Now that we have a high-level overview of how Bloom filters function in reducing the number of random reads in SST files, let’s deep dive into the RocksDB read path:
When we get a read request, the request looks at the active memtable to see if the key is there because a recent write request may have stored the key there. If we find the key in the active memtable, we don’t need to look at the read-only memtable (step 2) or the SST files (step 3).
If the key is not in the active memtable, the request checks the read-only memtable from newest to oldest. It’s important to note that all memtables may have overlapping keys because they contain the most recent datasets that your application writes. If we find the result here, we return it, and we don’t need to look at the SST files (step 3).
If we do not find the key in any of the memtables, then the read request checks all the SST files on disk using Bloom filters and returns the value.
Another critical component to highlight in this architecture is the read-only block cache that Kafka Streams uses. It is an in-memory buffer that caches frequently requested keys. It’s important to emphasize that memtables are only used for writes, while the block cache is only used for reads. The memtable and read-only block cache are the two pieces of memory that RocksDB uses to make your database efficient and performant for scaling.
We went into detail with RocksDB’s architecture and how writes and reads operate in RocksDB. One thing we didn’t mention is that RocksDB is highly configurable. Let’s highlight a few main configuration options of RocksDB.
The log structured merge architecture is an append-only data structure. For example, if you delete a key, the deletion is marked and recorded but the key is not immediately removed. Similarly, updates to a key are appended, but the previous value of the key is not updated in place. In order to keep the database size under control, we have to remove all the deleted keys and corresponding delete operations as well as the updated key-value pairs. Compaction is a process of combining a set of SST files and generating new SST files with overwritten keys and deleted keys purged from the output files. Compaction is crucial for securing high performance in RocksDB. There are two basic types of compaction that have different characteristics and are used for different workloads: level compaction and universal compaction.
In level compaction, there are multiple levels (L1-Ln), where an arbitrary number of SST files exist in each level (the green boxes below). L0 is a unique level that contains files just flushed from the memtable:
L1–Ln have target sizes. Compaction’s goal is to ensure that L1–Ln is under the target size. Usually, as you move down the level, the target size exponentially increases:
After L0 is compacted with L1, L1’s target size is breached:
Now, we need to compact L1 with L2. Let’s say that the second SST file on L1 has a newer version of a key where a=4. Let’s also say that the first, second, and third SST file on Level 2 have an older version of a key where a=3, a=2, and a=1. During level compaction, the second SST file on L1 and the first, second, and third SST file on L2 will be merged: