[Demo+Webinar] New Product Updates to Make Serverless Flink a Developer’s Best Friend | Watch Now

What’s New in Apache Kafka 2.8

Written By

I’m proud to announce the release of Apache Kafka 2.8.0 on behalf of the Apache Kafka® community. The 2.8.0 release contains many new features and improvements. This blog post highlights some of the more prominent ones. Be sure to see the release notes for the full list of changes. You can also watch the release video for a summary of what’s new:

This release offers an early access version of KIP-500, which allows you to run Kafka brokers without Apache ZooKeeper, instead depending on an internal Raft implementation. This highly anticipated architectural improvement enables support for more partitions per cluster, simpler operation, and tighter security.

Kafka broker, producer, and consumer

KIP-500: Replace ZooKeeper with a self-managed quorum

We are excited to announce that 2.8 introduces an early-access look at Kafka without ZooKeeper! The implementation is not yet feature complete and should not be used in production, but it is possible to start new clusters without ZooKeeper and go through basic produce and consume use cases.

At a high level, KIP-500 works by moving topic metadata and configurations out of ZooKeeper and into a new internal topic named @metadata. This topic is managed by an internal Raft quorum of “controllers” and is replicated to all brokers in the cluster. The leader of the Raft quorum serves the same role as the controller in clusters today. A node in the KIP-500 world can serve as a controller, a broker, or both, depending on the new process.roles configuration. See the README for quickstart instructions and additional details.

This release has been a massive effort by the community over the past year, and this will continue over the course of this year. We expect significant improvements when it comes to feature completeness and hardening by the mid-year and end of year releases. Here is a quick look at the most significant KIPs that have been
merged:

  • KIP-500: lays out the vision for an event-driven model for managing metadata with a replicated log managed with the Raft protocol
  • KIP-595: specifies the Raft protocol, which is used for the @metadata topic
  • KIP-631: specifies the event-driven controller model, including the new broker lifecycle and the metadata record schemas
  • KIP-590: specifies a new protocol to allow forwarding client requests from brokers to the controller

KIP-700: Add Describe Cluster API

The Kafka AdminClient has historically used the broker’s Metadata API to get information about the cluster. However, the Metadata API is primarily focused on supporting the consumer and producer client, which follow different patterns than the AdminClient. KIP-700 decouples the AdminClient from the Metadata API by adding a new API to directly query the brokers for information about the cluster. This change enables the addition of new admin features in the future without disruption to the producer and consumer.

KIP-684: Support mutual TLS authentication on SASL_SSL listeners

Kafka brokers support TLS client authentication (also known as mutual TLS authentication) to secure the client-broker system. Historically, you could only configure this authentication mechanism broker-wide, despite the fact that KIP-103 introduced the ability to independently configure different network listeners on the broker. KIP-684 allows you to enable mutual TLS on SASL_SSL listeners, improving their ability to secure their environments.

KIP-676: Respect logging hierarchy

Log4j uses a hierarchical model for configuring loggers within an application. Each logger’s name is delimited by periods (.), which are treated as levels in the logger hierarchy. Individual loggers and intermediate hierarchy levels can both be configured (for example, to enable debug logging). If an individual logger is not explicitly configured, it inherits the configuration of its nearest ancestor, all the way up to the root logger, which is the common ancestor of all loggers in the system. Historically, the Kafka broker’s APIs for viewing log levels did not respect this hierarchy, instead reporting only the root logger’s configuration for any unconfigured individual logger. KIP-676 corrects this behavior by instead resolving the logger configurations the same way that the logging framework does.

KIP-673: Emit JSONs with new auto-generated schema

Kafka brokers offer debug-level request/response logs. Previously, they were semi-structured logs produced by the request/response classes’ toString override. KIP-673 adjusts these logs to be JSON structured so that they can more easily be parsed and used by logging toolchains.

KIP-612: Limit broker connection creation rate

Creating a new connection adds overhead to the broker. KIP-306 mitigated the issue of connection storms due to unauthorized connections. However, connection storms may also come from authorized clients. To make it easier for you to ensure the stability of the brokers, KIP-612 adds the ability to set a limit on the rate at which the broker accepts new connections, both overall and per IP address.

KIP-516: Topic identifiers

Previously, topics in Kafka were identified solely by their name. KIP-516 introduces topic IDs to uniquely identify topics. Topic IDs are unique throughout their lifetime, even beyond deletion of the corresponding topic. They are also a more efficient representation on the wire and in memory compared to topic names.

Starting in 2.8.0, existing and new topics will be given topic IDs. Clients can now receive topic IDs through metadata responses. Work for this KIP is still ongoing, and future releases will include support for more efficient deletes, as well as adding topic IDs to other requests.

Kafka Connect

KIP-661: Expose task configurations in Connect REST API

Kafka Connect exposes a REST API allowing callers to view the configuration of running connectors. This is very useful for administrators, as it allows them to determine the workload of connectors. In Connect, connectors process the configuration before actually beginning execution and in some cases specialize and map this configuration to each individual task that will perform the actual work of transferring data to or from Kafka. Administrators have historically been able to view the nominal configuration, but not the actual resolved configurations used by the running tasks. KIP-661 adds a new API endpoint and method to allow callers to retrieve the actual runtime configuration of a connector’s tasks. This can be used for debugging but also for understanding the impact of failures (for example, a task crashing).

Kafka Streams

KIP-696: Update Streams FSM to clarify ERROR state meaning

Kafka Streams exposes a state machine to help you to reason about the state of their applications in logs and metrics, as well as to trigger user-defined behavior on state transitions. This state machine contains an ERROR state that has historically meant that all threads have died. Until KIP-663, there was no way to replace dead threads, so ERROR was a terminal state. However, with the addition of recent resilience improvements (KIP-663 and KIP-671 below), having no running threads no longer clearly indicates an error, nor is it terminal. Regardless, applications can still experience fatal errors, and you still need to know when this happens. KIP-696 updates the state machine to grant ERROR a more specific meaning, namely that it is a terminal state indicating that the application has experienced a fatal error.

KIP-689: Extend StreamJoined to allow more store configs

Kafka Streams offers the StreamJoined config object to set various configuration options for join operations. KIP-689 adds the ability to control the settings of the changelog topics that make the join state durable. The default configuration is still appropriate for most applications, but advanced operators need the ability to tune the configurations of the internal topics that support stream processing.

KIP-680: TopologyTestDriver should not require a properties argument

Kafka Streams offers the TopologyTestDriver runtime, which supports testing entire Streams applications in a fast, single-threaded, deterministic environment without running any extra components (like brokers or ZooKeeper). The constructor of TopologyTestDriver was designed to mirror the constructor of KafkaStreams (the main runtime): It takes the application itself (a topology) as one argument and the configuration as a second argument.

Historically, TopologyTestDriver enforced the same required configs as KafkaStreams, including a broker connection string and an application ID, even though these configurations are meaningless for TopologyTestDriver. Starting in 2.8.0, these boilerplate configurations are no longer required, and KIP-680 simplifies the common case by adding new constructor overloads that do not require a configuration argument at all. The main constructor is still available, so your current tests will continue to work, and you can still use the main constructor if you need to specify extra configuration options.

KIP-671: Introduce Kafka Streams specific uncaught exception handler

Kafka Streams encapsulates complex logic, including both user- and system-defined code, I/O operations, multi-threading, etc., all of which offer any number of opportunities to encounter an unexpected exception. Before, Kafka Streams adopted the safe and simple approach of throwing the exceptions up to the top level, which would ultimately kill the relevant execution thread. For visibility, Streams exposed the native Java thread’s ability to register an UncaughtExceptionHandler. In practice, many use cases require more than just visibility when a thread dies.

KIP-671 adds a new handler (StreamsUncaughtExceptionHandler), which offers the same level of visibility while also providing a mechanism to replace the dead thread (if you desire more resilience) or shut down the system (either all threads in the current instance or all instances in the cluster), in case you prefer to fail fast. The handler allows the selection of different actions, depending on the actual exception. Kafka Tutorials has a tutorial covering this new capability.

KIP-663: API to start and shut down StreamThreads

Kafka Streams applications are structured as a cluster of instances, each with some number of execution threads. The number of threads is configured at startup. Under heavy load, you may wish to experiment with increasing or decreasing the number of threads on an instance in order to better utilize system resources or reduce bottlenecks. Previously, this involved stopping, reconfiguring, and restarting each instance. KIP-663 adds new methods to the KafkaStreams interface, which allows you to individually add and remove processing threads without disrupting the other threads running on the same instance.

KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

One of the operations that Kafka Streams provides is the ability to window an input record stream for aggregation. For example, when computing the number of updates for each key per hour, the window size is one hour. The window size is defined as part of the stream processing logic in the Streams DSL, and Kafka Streams automatically configures the serializer and deserializer necessary to store and retrieve these windows from local storage and Kafka topics. Because the window size itself is fixed and known for a particular operation, the serializer and deserializer contain a space optimization, storing only the window’s start timestamp (as the end can be computed by adding the window size to the start time).

Occasionally, you need to directly load serialized records, for example, when debugging an application or verifying an intermediate processing phase. To support these use cases, KIP-659 gives callers a way to directly configure the deserializer (TimeWindowedDeserailizer) with the window size, in much the same way that Streams configures its own internal deserializer for the same data.

KIP-572: Improve timeouts and retries in Kafka Streams

KIP-572 was partially implemented in Apache Kafka 2.7.0 and completed in 2.8.0. This KIP adds a new retry behavior to fill an important resilience gap in running Kafka Streams applications. Many of Streams’ functions rely on remote calls, for example, to Kafka brokers. As with any network call, these operations are subject to arbitrary errors and delays.

The Kafka client libraries that Streams relies on have their own resilience settings, which can help to smooth out minor network disruptions, but setting the clients to be too resilient means that any client API call may block for a long time, which affects the overall stability of the application. On the other hand, setting these client timeouts too short would lead to applications crashing during minor network outages. KIP-572 adds a higher-level retry loop. Now, when Streams encounters a timeout exception while processing a task, it will attempt to make progress on other tasks before retrying the failed one.

Conclusion

Apache Kafka 2.8.0 has a lot of great fixes and improvements in addition to the KIPs listed here.

For next steps:

This was a huge community effort, so thank you to everyone who contributed to this release, including all our users and our 151 authors and reviewers:

17hao, abc863377, Adem Efe Gencer, akumar, Alexander Iskuskov, Alexandre Dupriez, Almog Gavra, Alok Nikhil, Anastasia Vela, Andrew Choi, Andrey Bozhko, Andrey Falko, Andy Coates, Andy Wilkinson, Ankit Kumar, Anna Povzner, Anna Sophie Blee-Goldman, APaMio, Arjun Satish, ArunParthiban-ST, Attila Sasvari, Benoit Maggi, bertber, Bill Bejeck, Bob Barrett, Boyang Chen, Brajesh Kumar, Brian Byrne, Bruno Cadonna, Cheng Tan, Chia-Ping Tsai, Chris Egerton, CHUN-HAO TANG, Colin Patrick McCabe, Cyrus Vafadari, David Arthur, David Jacot, David Mao, dengziming, Dhruvil Shah, Dima Reznik, Dongjoon Hyun, Dongxu Wang, Edoardo Comar, Emre Hasegeli, Ewen Cheslack-Postava, feyman2016, fml2, Gardner Vickers, Geordie, Govinda Sakhare, Greg Harris, Guozhang Wang, Gwen Shapira, Hamza Slama, high.lee, huxi, Igor Soarez, Ilya Ganelin, Ismael Juma, Israel Ekpo, Ivan Ponomarev, Ivan Yurchenko, jackyoh, Jakob Homan, James Cheng, James Yuzawa, Jason Gustafson, Jesse Gorzinski, Jim Galasyn, John Roesler, Jorge Esteban Quilcate Otoya, José Armando García Sancio, Jose Sancio, Julien Chanaud, Julien Jean Paul Sirocchi, Jun Rao, Justine Olshan, Kengo Seki, Konstantine Karantasis, Kowshik Prakasam, leah, Leah Thomas, Lee Dongjin, Levani Kokhreidze, Lev Zemlyanov, Liju John, limengmonty, Lincong Li, Lucas Bradstreet, Luke Chen, Manikumar Reddy, Marco Aurelio Lotz, mathieu, Matthew Wong, Matthias J. Sax, Matthias Merdes, Michael Bingham, Michael G. Noll, Mickael Maison, Montyleo, mowczare, Nigel Liang, Nikhil Bhatia, Nikolay Izhikov, Ning Zhang, Nitesh Mor, notifygd, Okada Haruki, Oliver Dineen, panguncle, parafiend, Patrick Dignan, Prateek Agarwal, Prithvi, Rajini Sivaram, Raman Verma, Ramesh Krishnan M, Rameshkrishnan Muthusamy, Randall Hauch, Richard Fussenegger, Rohan Desai, Rohit Deshpande, Ron Dagostino, Ryanne Dolan, Samuel Cantero, Sanjana Kaundinya, Sanket Fajage, Satish Duggana, Scott Hendricks, Scott Sugar, Shao Yang Hong, shenwenbing, ssugar, Stanislav Kozlovski, Stanislav Vodetskyi, Taisiia Goltseva, tang7526, Thorsten Hake, Tom Bentley, vamossagar12, Victoria Xia, Viktor Somogyi-Vass, voffcheg109, Walker Carlson, wenbingshen, William Hammond, wycccccc, xakassi, Xavier Léauté, Yilong Chang, and zhangyue19921010.

This post was originally published by John Roesler on The Apache Software Foundation blog.

  • John Roesler is a software engineer at Confluent, focusing mainly on Kafka Streams. Prior to that, he helped build and operate the high-volume event streaming platform at Bazaarvoice.

Did you like this blog post? Share it now