Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now
Software engineering memes are in vogue, and nothing is more fashionable than joking about how complicated distributed systems can be. Despite the ribbing, many people adopt them. Why? Distributed systems give us two things their single node counterparts cannot: scale and fault tolerance.
ksqlDB, the event streaming database, is built with a client/server architecture. You can run it with a single server, or you can cluster many servers together. Part 1 and part 2 in this series explained how a single server executes stateless and stateful operations. This post is about how these work when ksqlDB is deployed with many servers, and more importantly how it linearly scales the work it is performing—even in the presence of faults.
If you like, you can follow along by executing the example code yourself. ksqlDB’s quickstart makes it easy to get up and running.
When you scale ksqlDB, you add more servers to parallelize the work that it is performing—making it process data faster. But before we discuss how a distributed ksqlDB cluster works, let’s briefly review a single-node setup. Suppose you have a stream of monitoring data:
CREATE STREAM readings (
sensor VARCHAR KEY,
area VARCHAR,
reading INT
) WITH (
kafka_topic = 'readings',
partitions = 8,
value_format = 'json'
);
Whose events look like:
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-1', 'wheel', 20);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-1', 'motor', 29);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-1', 'wheel', 23);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-1', 'engine', 28);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-1', 'engine', 21);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-2', 'motor', 64);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-2', 'motor', 62);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-2', 'wheel', 68);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-2', 'engine', 61);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-2', 'motor', 64);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-3', 'motor', 46);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-3', 'motor', 54);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-3', 'wheel', 45);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-3', 'engine', 53);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-3', 'motor', 51);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-4', 'motor', 16);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-4', 'wheel', 24);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-4', 'motor', 17);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-4', 'engine', 18);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-4', 'motor', 25);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-5', 'wheel', 90);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-5', 'wheel', 88);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-5', 'wheel', 91);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-5', 'engine', 86);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-5', 'engine', 88);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-6', 'motor', 67);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-6', 'engine', 66);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-6', 'wheel', 65);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-6', 'motor', 60);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-6', 'engine', 63);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-7', 'engine', 35);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-7', 'motor', 36);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-7', 'wheel', 30);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-7', 'motor', 31);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-7', 'wheel', 36);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-8', 'wheel', 94);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-8', 'wheel', 95);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-8', 'motor', 99);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-8', 'engine', 94);
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-8', 'motor', 97);
You can transform this stream into another by submitting a persistent query. Each time you run a persistent query, ksqlDB’s server compiles the query’s textual representation into a physical execution plan in the form of a Kafka Streams topology. The topology runs as a daemon forever, reacting to new rows as soon as they become available. This means that all processing happens on ksqlDB’s server; no processing work happens on the Apache Kafka® brokers. It looks roughly like this:
Now, when you add more servers to your cluster, ksqlDB spreads out the workload as much as possible. Just as a busy bank with a long line of customers adds more tellers, ksqlDB divides the work and assigns it to the servers. Because each server performs only a fraction of the total work, they can operate more efficiently, instead of wasting resources context switching.
But what does it really mean for ksqlDB to divide up a persistent query’s work? In Kafka, the unit of parallelism is the partition, and ksqlDB takes after its companion. Every persistent query reads rows from at least one input stream or table. ksqlDB examines the total number of partitions of those inputs, divides it by the number of servers, and assigns each server its share.
When we add a second ksqlDB server to our example, each server now processes only four input partitions (because eight partitions divided by two servers is four). From here on out, each persistent query in an animation will have a letter in parentheses after it. The letter distinguishes which server the query is running on. For instance, in the next animation, a and b are the fictitious names of two ksqlDB servers.
By adding a second server to the cluster, the processing power of ksqlDB has doubled. This phenomenon is called linear scalability: Each time you add another server, the processing power of the cluster rises in proportion to the previous cluster size (up to a limit, as we’ll see). All you need to do to make this happen is configure each ksqlDB server with the same ksql.service.id server configuration value. If multiple ksqlDB servers with the same service ID connect to the same Kafka cluster, they will form a ksqlDB cluster and share the workload.
The architecture scales, but how is the mapping of input partitions to servers chosen? To answer that question, we need to pierce through several layers of ksqlDB’s architecture, all the way down to its Kafka clients. In Kafka, consumers have two options to read data from partitions. They can either assign themselves partitions and process all the data, or they can join a consumer group. A consumer group is a collection of consumers that coalesce under a common name to divide the partition data.
Consumer groups are a powerful primitive to build on because they machete through a forest of thorny problems. We’ve already seen how they handle work sharing, evenly dividing up a body of data across a set of replica programs. They also perform dynamic load balancing—as servers join and leave the cluster, the mapping of partitions to servers is automatically updated. Finally, they are resilient to faults. If a server experiences a failure that makes it unable to continue, the consumer group will detect it, boot it out of the group, and incrementally adjust the mapping.
Another positive consequence of building on top of Kafka is that it’s easy to determine the optimal ksqlDB cluster size for processing parallelism. By deploying the same number of ksqlDB servers as persistent query input partitions, you achieve a pleasing degree of parallelism:
Notice that compared to the first animation, this one isn’t sped up. Each record still takes the same amount of time to process. The difference is that the records are being processed in parallel eight ways, instead of serially, making the total duration 8x shorter.
This design implies that any surplus ksqlDB servers will remain idle. For example, if you deploy 10 servers and there are only eight input partitions (for all running queries), two of them will remain idle. I’ve exercised a small creative liberty so far and implied that each ksqlDB server can only execute one query at a time, but how it works is slightly more nuanced than that. It’s easier to understand that model at first, but in reality, each server runs all submitted queries. To control how this works, you can control how much parallelism (how many total threads) a given query will use across each server with the ksql.streams.num.stream.threads query configuration parameter (which defaults to four). Suppose that you have five servers and you deploy a query with the stream threads parameter set to 6. Each server will run 6 threads, meaning that the cluster will run a total of 30 threads for the query. If the query has 25 input partitions, 5 of the threads will remain idle.
Let’s now turn our attention to the second half of this blog post: fault tolerance. Fault tolerance is how ksqlDB continues to function correctly when something goes wrong. The way that ksqlDB recovers from a problem depends on what it was doing in the first place. For our purposes, there are two categories of operations that ksqlDB can run: stateless and stateful.
Stateless operations are programs that don’t need to remember anything in between the rows it processes. For example, a program that uppercases a name column is stateless. It does the same thing for every row, so it doesn’t need to remember anything about the previous rows. Stateless operations are easy to recover from, both because they do not need memory and because of how ksqlDB is implemented. When a ksqlDB server running a stateless operation fails, the cluster only needs to tell the new server where the old server left off.
How does the cluster do that? In the last section, we saw how ksqlDB makes extensive use of Kafka’s consumer groups. As members of a consumer group process records, they periodically tell the Kafka brokers the last offset of each partition they processed. This is called committing, and consumer groups do it so that the brokers can instruct consumers where to resume in the event of a failure.
Stateful operations, on the other hand, do require memory in between the rows that they process. A good example of a stateful operation is the AVERAGE() aggregation. Each time a new row is processed, a number column is added to the running sum and divided by the total number of occurrences. The running values have to be stored somewhere so that ksqlDB can recall it when the next row arrives.
To understand how this works, we’ll need to explore the layer in between ksqlDB and the Kafka brokers: Kafka Streams. An example will guide the way.
Imagine that you created a materialized view (a type of stateful operation) that stores the average of the sensor readings. For every new row that arrives, ksqlDB does two things. First, it incrementally updates the materialized view (the thing it is remembering, in our analogy) to integrate the incoming row. Second, it emits a row to a changelog topic. The changelog is an audit trail of the materialized view.
Pause the animation and note how the materialized view (yellow box) corresponds to the changelog, hovering over the rows to see their contents. The current values in the materialized views are the latest values per key in the changelog. For example, notice how the first and second events in partition 0 of the changelog have an average of 20 and 24.5. The second event is a refinement of the first—the average changed from 20 to 24.5. That refinement causes the average for sensor-1 to be updated incrementally by factoring in only the new data.
How exactly does this work? Remember that when a persistent query runs on a ksqlDB server, it’s compiled into a Kafka Streams topology. Kafka Streams uses RocksDB, an embedded key/value store, to store materialized views on disk. RocksDB takes care of all the details of storing and indexing the current state (in this case, the running averages) on disk with high performance.
Now, what happens when the ksqlDB server stops functioning? RocksDB resides on the disk of the failed server, meaning its contents are lost. How can the ksqlDB cluster recover, shifting the old server’s workload to the new one without losing data?
The answer lies in the changelog topics. Every row in the changelog topic contains the value that the materialized view was updated to. When the new ksqlDB server takes over the old server’s workload, it first replays the changelog data directly into its own RocksDB store. When it reaches the end of the changelog, its local materialized view is up to date, and it can begin serving queries.
If there have been multiple failovers, a server taking over a stateful operation might have stale data in its RocksDB instance. In that case, it can simply replay the part of the changelog that is new, allowing it to rapidly recover to the current state.
This style of fault tolerance is sometimes called cold recovery because downtime is incurred while a server replays the changelog. Although it works, is it possible to get fault tolerance without any downtime?
For many applications, downtime is unacceptable. People depend on software getting the job done on time, and the consequences of making them wait are too high. To achieve fault tolerance without downtime, many designs have been invented to support high availability. There are lots of ways to make a system highly available, but ksqlDB does it with grace and simplicity.
The goal of a highly available design is to make failures invisible to users. So what does that mean for ksqlDB? Let’s recap what we’ve learned. First, we talked about why failover is fast for stateless operations—consumer groups help new servers pick up where old servers left off. Next, we talked about why failover can be slow for stateful operations—new servers need to replay changelog data before they can resume processing. Now, we’ll look at how ksqlDB can be pressed into service to fail over quickly for stateful operations.
One way to frame why ksqlDB is unavailable during a failover is because it is lazily replaying changelog data. In other words, it only does it when it absolutely has to. What if ksqlDB instead replayed changelog data more eagerly? That is the basis of how high availability works here.
You specify how many additional replicas you want for a stateful operation. The cluster will make that number of servers play the changelog data into each server as soon as it is appended to the topic—not waiting for a failover event. When a failover does occur, one of the replicas is chosen to take the old server’s workload. Because it was eagerly replaying the changelog data, there is little to no work to do to complete the recovery of the RocksDB store. It can start processing data and serving queries nearly instantly. Here is what that looks like (slimmed down to four partitions and two servers for readability):
The changelogs are played into the replica servers, denoted r-N pq1(X). Put differently, r-2 pq1(a) is server (a) acting as a replica for partitions 2 and 3. r-2 pq1(b) is server (b) acting as a replica for partitions 0 and 1.
Here again, ksqlDB stands on the shoulders of Kafka Streams. In Kafka Streams, stateful operations can be replicated using standby-tasks. ksqlDB abstracts task-level processing and neatly hides the details of how it works behind a few configuration parameters. You can turn on replicas for each query in ksqlDB by specifying these configuration flags on each server:
Each time you submit a persistent query, you can also specify the ksql.query.pull.max.allowed.offset.lag query property. What does it do? Notice how there remains a small delay between when the primary server’s materialized view is updated and when the change is reflected in the replica’s materialized view. ksqlDB lets you query these views from applications using pull queries (in the case that the active replica has a failure). This configuration option allows you to control how stale a replica’s materialized view can be for any query—a rich topic to survey in a future blog post.
As with all choices in software, high availability has its trade-offs. Enabling replication incurs higher network and storage resources because replica servers need to aggressively keep up with their primary.
The distributed systems memes are funny because there’s an element of truth to them. Distributed systems are hard and, when designed without care, can be a nightmare to use. Time can move backward. Programs can grind to a halt. Data can evaporate. That is why it is such a big advantage having ksqlDB aggressively architected for Apache Kafka. New distributed systems inevitably need to take their lumps, uncovering and fixing dozens of terrifying bugs. ksqlDB, by contrast, reinvents little distributed systems machinery, instead relying on all of the sweat that has already gone into Kafka. More than a decade old and being used in production by the likes of Twitter, Netflix, and Tencent, Kafka is a mature, battle-tested project.
The rest of this series will look at how joins, time, and consistency work. Until then, there’s no substitute for trying ksqlDB yourself.
Tableflow can seamlessly make your Kafka operational data available to your AWS analytics ecosystem with minimal effort, leveraging the capabilities of Confluent Tableflow and Amazon SageMaker Lakehouse.
Building a headless data architecture requires us to identify the work we’re already doing deep inside our data analytics plane, and shift it to the left. Learn the specifics in this blog.