Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now
I am very excited to tell you about the forthcoming 0.8.2 release of Apache Kafka. Kafka is a fault-tolerant, low-latency, high-throughput distributed messaging system used in data pipelines at several companies. Kafka became a top-level Apache project in 2012 and was originally created at LinkedIn, where it forms a critical part of LinkedIn’s infrastructure and transmits data to all systems and applications. The project is currently under active development from a diverse group of contributors.
Since there are many new features in 0.8.2, we released 0.8.2-beta. The final release will be done when 0.8.2 is stable.
Here is a quick overview of the notable work in this release.
The JVM clients that Kafka ships haven’t changed much since Kafka was originally built. Over time, we have realized some of the limitations and problems that came both from the design of these clients and their APIs. We are in the process of rewriting both the producer and consumer Java clients. The producer is complete and will be included in 0.8.2.
At a high level, the primary difference in this producer is that it removes the distinction between the “sync” and “async” producer. Effectively, all requests are sent asynchronously but always return a future response object that returns the offset as well as any error that may have occurred when the request is complete.
The batching that is done only in the async producer today is done whenever possible now. This means that the sync producer, under load, can get performance as good as the async producer. This works similarly to group commit in databases but with respect to the actual network transmission – any messages that arrive while a send is in progress are batched together. It is also possible to encourage batching even under low load to save server resources by introducing a delay on the send to allow more messages to accumulate; this is done using the linger.ms config (similar to Nagle’s algorithms in TCP). This new producer does all network communication asynchronously and in parallel to all servers so the performance penalty for acks=-1 and waiting on replication should be much reduced.
The new Java clients will be protocol compatible with any 0.8.x Kafka server. The API docs for the new producer can be found here.
This is our third try releasing a working delete topic feature. You would think deleting the data would be easy and not deleting it would be the hard part. But each of our previous attempts had flaws that appeared with usage. This time significant testing has been done and we are hopeful that your data will really be gone.
This is a pretty major feature and a huge win that deserves its own blog post by the feature’s authors, Joel Koshy and Tejas Patil. I will attempt to explain it briefly here.
One of the expensive operations a messaging system has to perform is keeping track of what messages are consumed. Kafka does this using “offsets”, a marker it stores for the consumer’s position in the log of messages. Previously this marker was stored by the JVM client in ZooKeeper. The problem with this usage is that for consumers that want to update their position frequently this can become a bottleneck. Zookeeper writes are expensive, and can’t be scaled horizontally. In 0.8.2, we added native offset storage functionality and a new API for making use of this. This makes offset storage horizontally scalable.
Internally the implementation of the offset storage is just a compacted Kafka topic (__consumer_offsets) keyed on the consumer’s group, topic, and partition. The offset commit request writes the offset to the compacted Kafka topic using the highest level of durability guarantee that Kafka provides (acks=-1) so that offsets are never lost in the presence of uncorrelated failures. Kafka maintains an in-memory view of the latest offset per <consumer group, topic, partition> triplet, so offset fetch requests can be served quickly without requiring a full scan of the compacted offsets topic. With this feature, consumers can checkpoint offsets very often, possibly per message.
In Kafka, the leader replica for a partition does the reading and writing of messages. For an efficient load distribution, it is important to ensure that the leader replicas are evenly distributed amongst the brokers in a cluster.
When a topic is created, Kafka’s replica placement strategy places the preferred replica (first replica in the list of replicas) for every partition evenly across the cluster. This ensures that as long as the leader lives on the preferred replica for a majority of partitions, it will lead to an even distribution of leaders in a cluster. When brokers are bounced or failures occur, leaders automatically failover to a different live replica. Over time, this may lead to an uneven distribution of leaders in a cluster with some brokers serving more data compared to others.
In Kafka 0.8.1, we added an experimental feature that automatically detects such leader imbalance and periodically triggers leader re-election to maintain an even distribution of leaders by moving leaders back to the preferred replica if alive. Since then, the feature has been tested and is ready for broader usage. Previously, this was purely an admin operation that could be triggered via the kafka-preferred-replica-election.sh command that ships with Kafka. This feature greatly improves the operability of Kafka.
In Kafka 0.8.0, we added a configurable controlled shutdown feature (controlled.shutdown.enable) that reduces partition unavailability when brokers are bounced for upgrades or routine maintenance. The way this works is that if a Kafka broker receives a request to shutdown and detects that controlled shutdown is enabled, it moves the leaders from itself to other alive brokers in the cluster, before shutting down. If controlled shutdown is disabled, the broker shuts down immediately and, from that time until the time the controller elects a new leader for all partitions whose leaders lived on the dead broker, those partitions are unavailable. Depending on the size of the cluster and the number of topic partitions, this time could be significant. For planned broker restarts, it is desirable to move the leaders proactively so partitions are always writable even when individual brokers are temporarily unavailable. Previously, this feature didn’t work well with automatic leader rebalancing and also with single replica partitions, but those problems are fixed in 0.8.2, so controlled shutdown should be enabled by default at all times on a production Kafka cluster.
We added a few features that improve the durability guarantees provided by Kafka.
Before 0.8.2, a single application could create an unbounded number of connections to a Kafka broker. This causes a broker to run out of open file handles and effectively renders the broker unusable it is restarted. In 0.8.2, we added a server side config (max.connections.per.ip) to control the number of socket connections per client IP.
We plan to release 0.8.2 towards the end of this year. Meanwhile, you can find 0.8.2-beta here.
Even after all this work, there’s still a lot to be done. In our forthcoming releases, we plan to focus on usability and operability. This work includes:
Now is a great time to get involved. You can start by running through the Kafka quick start, signing up for the mailing list, and grabbing some newbie JIRAs.
If you enjoy working on Kafka and would like to do so full time, we are hiring at Confluent!
Tableflow can seamlessly make your Kafka operational data available to your AWS analytics ecosystem with minimal effort, leveraging the capabilities of Confluent Tableflow and Amazon SageMaker Lakehouse.
Building a headless data architecture requires us to identify the work we’re already doing deep inside our data analytics plane, and shift it to the left. Learn the specifics in this blog.