I’m proud to announce the release of Apache Kafka 2.8.0 on behalf of the Apache Kafka® community. The 2.8.0 release contains many new features and improvements. This blog post highlights some of the more prominent ones. Be sure to see the release notes for the full list of changes. You can also watch the release video for a summary of what’s new:
This release offers an early access version of KIP-500, which allows you to run Kafka brokers without Apache ZooKeeper, instead depending on an internal Raft implementation. This highly anticipated architectural improvement enables support for more partitions per cluster, simpler operation, and tighter security.
We are excited to announce that 2.8 introduces an early-access look at Kafka without ZooKeeper! The implementation is not yet feature complete and should not be used in production, but it is possible to start new clusters without ZooKeeper and go through basic produce and consume use cases.
At a high level, KIP-500 works by moving topic metadata and configurations out of ZooKeeper and into a new internal topic named
@metadata. This topic is managed by an internal Raft quorum of “controllers” and is replicated to all brokers in the cluster. The leader of the Raft quorum serves the same role as the controller in clusters today. A node in the KIP-500 world can serve as a controller, a broker, or both, depending on the new
process.roles configuration. See the README for quickstart instructions and additional details.
This release has been a massive effort by the community over the past year, and this will continue over the course of this year. We expect significant improvements when it comes to feature completeness and hardening by the mid-year and end of year releases. Here is a quick look at the most significant KIPs that have been
The Kafka AdminClient has historically used the broker’s Metadata API to get information about the cluster. However, the Metadata API is primarily focused on supporting the consumer and producer client, which follow different patterns than the AdminClient. KIP-700 decouples the AdminClient from the Metadata API by adding a new API to directly query the brokers for information about the cluster. This change enables the addition of new admin features in the future without disruption to the producer and consumer.
Kafka brokers support TLS client authentication (also known as mutual TLS authentication) to secure the client-broker system. Historically, you could only configure this authentication mechanism broker-wide, despite the fact that KIP-103 introduced the ability to independently configure different network listeners on the broker. KIP-684 allows you to enable mutual TLS on SASL_SSL listeners, improving their ability to secure their environments.
Log4j uses a hierarchical model for configuring loggers within an application. Each logger’s name is delimited by periods (.), which are treated as levels in the logger hierarchy. Individual loggers and intermediate hierarchy levels can both be configured (for example, to enable debug logging). If an individual logger is not explicitly configured, it inherits the configuration of its nearest ancestor, all the way up to the root logger, which is the common ancestor of all loggers in the system. Historically, the Kafka broker’s APIs for viewing log levels did not respect this hierarchy, instead reporting only the root logger’s configuration for any unconfigured individual logger. KIP-676 corrects this behavior by instead resolving the logger configurations the same way that the logging framework does.
Kafka brokers offer debug-level request/response logs. Previously, they were semi-structured logs produced by the request/response classes’
toString override. KIP-673 adjusts these logs to be JSON structured so that they can more easily be parsed and used by logging toolchains.
Creating a new connection adds overhead to the broker. KIP-306 mitigated the issue of connection storms due to unauthorized connections. However, connection storms may also come from authorized clients. To make it easier for you to ensure the stability of the brokers, KIP-612 adds the ability to set a limit on the rate at which the broker accepts new connections, both overall and per IP address.
Previously, topics in Kafka were identified solely by their name. KIP-516 introduces topic IDs to uniquely identify topics. Topic IDs are unique throughout their lifetime, even beyond deletion of the corresponding topic. They are also a more efficient representation on the wire and in memory compared to topic names.
Starting in 2.8.0, existing and new topics will be given topic IDs. Clients can now receive topic IDs through metadata responses. Work for this KIP is still ongoing, and future releases will include support for more efficient deletes, as well as adding topic IDs to other requests.
Kafka Connect exposes a REST API allowing callers to view the configuration of running connectors. This is very useful for administrators, as it allows them to determine the workload of connectors. In Connect, connectors process the configuration before actually beginning execution and in some cases specialize and map this configuration to each individual task that will perform the actual work of transferring data to or from Kafka. Administrators have historically been able to view the nominal configuration, but not the actual resolved configurations used by the running tasks. KIP-661 adds a new API endpoint and method to allow callers to retrieve the actual runtime configuration of a connector’s tasks. This can be used for debugging but also for understanding the impact of failures (for example, a task crashing).
Kafka Streams exposes a state machine to help you to reason about the state of their applications in logs and metrics, as well as to trigger user-defined behavior on state transitions. This state machine contains an
ERROR state that has historically meant that all threads have died. Until KIP-663, there was no way to replace dead threads, so
ERROR was a terminal state. However, with the addition of recent resilience improvements (KIP-663 and KIP-671 below), having no running threads no longer clearly indicates an error, nor is it terminal. Regardless, applications can still experience fatal errors, and you still need to know when this happens. KIP-696 updates the state machine to grant
ERROR a more specific meaning, namely that it is a terminal state indicating that the application has experienced a fatal error.
Kafka Streams offers the
StreamJoined config object to set various configuration options for join operations. KIP-689 adds the ability to control the settings of the changelog topics that make the join state durable. The default configuration is still appropriate for most applications, but advanced operators need the ability to tune the configurations of the internal topics that support stream processing.
Kafka Streams offers the
TopologyTestDriver runtime, which supports testing entire Streams applications in a fast, single-threaded, deterministic environment without running any extra components (like brokers or ZooKeeper). The constructor of
TopologyTestDriver was designed to mirror the constructor of
KafkaStreams (the main runtime): It takes the application itself (a topology) as one argument and the configuration as a second argument.
TopologyTestDriver enforced the same required configs as
KafkaStreams, including a broker connection string and an application ID, even though these configurations are meaningless for
TopologyTestDriver. Starting in 2.8.0, these boilerplate configurations are no longer required, and KIP-680 simplifies the common case by adding new constructor overloads that do not require a configuration argument at all. The main constructor is still available, so your current tests will continue to work, and you can still use the main constructor if you need to specify extra configuration options.
Kafka Streams encapsulates complex logic, including both user- and system-defined code, I/O operations, multi-threading, etc., all of which offer any number of opportunities to encounter an unexpected exception. Before, Kafka Streams adopted the safe and simple approach of throwing the exceptions up to the top level, which would ultimately kill the relevant execution thread. For visibility, Streams exposed the native Java thread’s ability to register an
UncaughtExceptionHandler. In practice, many use cases require more than just visibility when a thread dies.
KIP-671 adds a new handler (
StreamsUncaughtExceptionHandler), which offers the same level of visibility while also providing a mechanism to replace the dead thread (if you desire more resilience) or shut down the system (either all threads in the current instance or all instances in the cluster), in case you prefer to fail fast. The handler allows the selection of different actions, depending on the actual exception. Kafka Tutorials has a tutorial covering this new capability.
Kafka Streams applications are structured as a cluster of instances, each with some number of execution threads. The number of threads is configured at startup. Under heavy load, you may wish to experiment with increasing or decreasing the number of threads on an instance in order to better utilize system resources or reduce bottlenecks. Previously, this involved stopping, reconfiguring, and restarting each instance. KIP-663 adds new methods to the
KafkaStreams interface, which allows you to individually add and remove processing threads without disrupting the other threads running on the same instance.
One of the operations that Kafka Streams provides is the ability to window an input record stream for aggregation. For example, when computing the number of updates for each key per hour, the window size is one hour. The window size is defined as part of the stream processing logic in the Streams DSL, and Kafka Streams automatically configures the serializer and deserializer necessary to store and retrieve these windows from local storage and Kafka topics. Because the window size itself is fixed and known for a particular operation, the serializer and deserializer contain a space optimization, storing only the window’s start timestamp (as the end can be computed by adding the window size to the start time).
Occasionally, you need to directly load serialized records, for example, when debugging an application or verifying an intermediate processing phase. To support these use cases, KIP-659 gives callers a way to directly configure the deserializer (
TimeWindowedDeserailizer) with the window size, in much the same way that Streams configures its own internal deserializer for the same data.
KIP-572 was partially implemented in Apache Kafka 2.7.0 and completed in 2.8.0. This KIP adds a new retry behavior to fill an important resilience gap in running Kafka Streams applications. Many of Streams’ functions rely on remote calls, for example, to Kafka brokers. As with any network call, these operations are subject to arbitrary errors and delays.
The Kafka client libraries that Streams relies on have their own resilience settings, which can help to smooth out minor network disruptions, but setting the clients to be too resilient means that any client API call may block for a long time, which affects the overall stability of the application. On the other hand, setting these client timeouts too short would lead to applications crashing during minor network outages. KIP-572 adds a higher-level retry loop. Now, when Streams encounters a timeout exception while processing a task, it will attempt to make progress on other tasks before retrying the failed one.
Apache Kafka 2.8.0 has a lot of great fixes and improvements in addition to the KIPs listed here.
For next steps:
This was a huge community effort, so thank you to everyone who contributed to this release, including all our users and our 151 authors and reviewers:
17hao, abc863377, Adem Efe Gencer, akumar, Alexander Iskuskov, Alexandre Dupriez, Almog Gavra, Alok Nikhil, Anastasia Vela, Andrew Choi, Andrey Bozhko, Andrey Falko, Andy Coates, Andy Wilkinson, Ankit Kumar, Anna Povzner, Anna Sophie Blee-Goldman, APaMio, Arjun Satish, ArunParthiban-ST, Attila Sasvari, Benoit Maggi, bertber, Bill Bejeck, Bob Barrett, Boyang Chen, Brajesh Kumar, Brian Byrne, Bruno Cadonna, Cheng Tan, Chia-Ping Tsai, Chris Egerton, CHUN-HAO TANG, Colin Patrick McCabe, Cyrus Vafadari, David Arthur, David Jacot, David Mao, dengziming, Dhruvil Shah, Dima Reznik, Dongjoon Hyun, Dongxu Wang, Edoardo Comar, Emre Hasegeli, Ewen Cheslack-Postava, feyman2016, fml2, Gardner Vickers, Geordie, Govinda Sakhare, Greg Harris, Guozhang Wang, Gwen Shapira, Hamza Slama, high.lee, huxi, Igor Soarez, Ilya Ganelin, Ismael Juma, Israel Ekpo, Ivan Ponomarev, Ivan Yurchenko, jackyoh, Jakob Homan, James Cheng, James Yuzawa, Jason Gustafson, Jesse Gorzinski, Jim Galasyn, John Roesler, Jorge Esteban Quilcate Otoya, José Armando García Sancio, Jose Sancio, Julien Chanaud, Julien Jean Paul Sirocchi, Jun Rao, Justine Olshan, Kengo Seki, Konstantine Karantasis, Kowshik Prakasam, leah, Leah Thomas, Lee Dongjin, Levani Kokhreidze, Lev Zemlyanov, Liju John, limengmonty, Lincong Li, Lucas Bradstreet, Luke Chen, Manikumar Reddy, Marco Aurelio Lotz, mathieu, Matthew Wong, Matthias J. Sax, Matthias Merdes, Michael Bingham, Michael G. Noll, Mickael Maison, Montyleo, mowczare, Nigel Liang, Nikhil Bhatia, Nikolay Izhikov, Ning Zhang, Nitesh Mor, notifygd, Okada Haruki, Oliver Dineen, panguncle, parafiend, Patrick Dignan, Prateek Agarwal, Prithvi, Rajini Sivaram, Raman Verma, Ramesh Krishnan M, Rameshkrishnan Muthusamy, Randall Hauch, Richard Fussenegger, Rohan Desai, Rohit Deshpande, Ron Dagostino, Ryanne Dolan, Samuel Cantero, Sanjana Kaundinya, Sanket Fajage, Satish Duggana, Scott Hendricks, Scott Sugar, Shao Yang Hong, shenwenbing, ssugar, Stanislav Kozlovski, Stanislav Vodetskyi, Taisiia Goltseva, tang7526, Thorsten Hake, Tom Bentley, vamossagar12, Victoria Xia, Viktor Somogyi-Vass, voffcheg109, Walker Carlson, wenbingshen, William Hammond, wycccccc, xakassi, Xavier Léauté, Yilong Chang, and zhangyue19921010.
This post was originally published by John Roesler on The Apache Software Foundation blog.