Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

What’s New in Apache Kafka 2.4

Written By

On behalf of the Apache Kafka® community, it is my pleasure to announce the release of Apache Kafka 2.4.0. This release includes a number of key new features and improvements that we will highlight in this blog post. For the full list, please see the release notes.

What’s new with the Kafka broker, producer, and consumer

KIP-392: Allow consumers to fetch from closest replica

Historically, consumers were only allowed to fetch from leaders. In multi-datacenter deployments, this often means that consumers are forced to incur expensive cross-datacenter network costs in order to fetch from the leader. With KIP-392, Kafka now supports reading from follower replicas. This gives the broker the ability to redirect consumers to nearby replicas in order to save costs.

See KIP-392 and this blog post for more details.

KIP-429: Kafka Consumer Incremental Rebalance Protocol

KIP-429 adds Incremental Cooperative Rebalancing to the consumer rebalance protocol in addition to the original eager rebalance protocol. Unlike the eager protocol, which always revokes all assigned partitions prior to a rebalance and then tries to reassign them altogether, the incremental protocol tries to minimize the partition migration between members of a consumer group by letting consumers retain their partitions during a rebalance event. As a result, end-to-end rebalance times triggered by scaling out/down operations as well as rolling bounces are shorter, benefitting heavy, stateful consumers, such as Kafka Streams applications.

See KIP-429 and this blog post for more details.

KIP-455: Create an Administrative API for Replica Reassignment

As a replacement for the existing ZooKeeper-based API, the new API supports incremental replica reassignments and cancellation of ongoing reassignments. This also addresses the current limitations in the ZooKeeper-based API like security enforcement and auditability. The new API is exposed via the AdminClient.

See KIP-455 for more details.

KIP-480: Sticky Partitioner

Currently, in the case where no partition and key are specified, a producer’s default partitioner partitions records in a round-robin fashion. This results in more batches that are smaller in size and leads to more requests and queuing as well as higher latency.

KIP-480 implements a new partitioner, which chooses the sticky partition that changes when the batch is full if no partition or key is present. Using the sticky partitioner helps improve message batching, decrease latency, and reduce the load for the broker. Some of the benchmarks which Justine Olshan discusses on the KIP show up to a 50% reduction in latency and 5–15% reduction in CPU utilization.

See KIP-480 and this blog post for more details.

KIP-482: The Kafka Protocol should Support Optional Tagged Fields

The Kafka remote procedure call (RPC) protocol has its own serialization format for binary data. The Kafka protocol currently does not support optional fields, nor does it support attaching an extra field to a message in a manner that is orthogonal to the versioning scheme.

In order to support these scenarios, KIP-482 adds optional tagged fields to the Kafka serialization format. Tagged fields are always optional. KIP-482 also implements more efficient serialization for variable-length objects.

See KIP-482 for more details.

KIP-504: Add new Java Authorizer Interface

This KIP defines a Java authorizer API that is consistent with other pluggable interfaces in the broker. Several limitations in the current Scala authorizer API that could not be fixed without breaking compatibility have been addressed in the new API. Additional request context is now provided to authorizers to support authorization based on the security protocol or listener.

The API also supports asynchronous ACL updates with batching. The new pluggable authorizer API only requires a dependency on the client’s JAR. A new out-of-the-box authorizer has been added, leveraging features supported by the new API. The additional context provided to the authorizer has been used to improve audit logging. Batched updates enhance the efficiency of ACL updates using the new authorizer when multiple ACLs are added for a resource. An asynchronous startup and updated APIs will enable Kafka to be used as the storage backend for ACLs once ZooKeeper is removed under KIP-500. In addition, authorizer implementations can now enable dynamic reconfiguration without broker restarts.

See KIP-504 for more details.

KIP-525: Return topic metadata and configs in CreateTopics response

Before, the CreateTopics API response only returned a success or failure status along with any errors. With KIP-525, the API response returns additional metadata, including the actual configuration of the topic that was created. This removes the need for additional requests to obtain topic configuration after creating the topic.

Furthermore, this KIP enables users to obtain default broker configs for topic creation using CreateTopics with validateOnly=true. This is useful for displaying default configs in management tools used to create topics.

See KIP-525 for more details.

KAFKA-7548: KafkaConsumer should not throw away already fetched data for paused partitions.

When a partition is paused by the user in the consumer, the partition is considered “unfetchable.” When the consumer has already fetched data for a partition and the partition is paused, then in the next consumer poll all data from “unfetchable” partitions will be discarded. In use cases where pausing and resuming partitions are common during regular operation of the consumer, this can result in discarding pre-fetched data when it’s not necessary.

Once the partition is resumed, new fetch requests will be generated and sent to the broker to get the same partition data again. Depending on the frequency of pausing and resuming of partitions, this can impact different aspects of consumer polling, including broker/consumer throughput, number of consumer fetch requests, and NIO-related garbage collection (GC) concerns for regularly dereferenced byte buffers of partition data. This issue is now resolved by retaining completed fetch data for partitions that are paused so that it may be returned in a future consumer poll once the partition is resumed by the user.

See KAFKA-7548 for more details.

What’s new in Kafka Connect

KIP-382: MirrorMaker 2.0

KIP-382 implements MirrorMaker 2.0 (MM2), a new multi-cluster, cross-datacenter replication engine based on the Connect framework. This tool includes several features designed for disaster recovery, including cross-cluster consumer checkpoints and offset syncs. Automatic topic renaming and cycle detection enable bidirectional active-active replication and other complex topologies.

A new RemoteClusterUtils class enables clients to interpret checkpoints, heartbeats, and “remote topics” from other clusters.

See KIP-382 for more details.

KIP-440: Extend Connect Converter to support headers

KIP-440 adds header support to Kafka Connect. This enables the use of Kafka Connect together with Kafka producers and consumers that rely on headers for serialization/deserialization.

See KIP-440 for more details.

KIP-507: Securing Internal Connect REST Endpoints

KIP-507 brings out-of-the-box authentication and authorization to an internal REST endpoint used by Connect workers to relay task configurations to the leader. If left unsecured, this endpoint could be used to write arbitrary task configurations to a Connect cluster.

However, after KIP-507, the endpoint automatically secures as long as the other attack surfaces of a Connect cluster (such as its public REST API and the underlying Kafka cluster used to host
internal topics and perform group coordination) are also secure.

See KIP-507 for more details.

KIP-481: SerDe Improvements for Connect Decimal type in JSON

KIP-481 adds to the JSON converter decimal.format for serializing Connect’s DECIMAL logical type values as number literals rather than base64 string literals. This new option defaults to base64 to maintain the previous behavior, but it can be changed to number to serialize decimal values as normal JSON numbers. The JSON converter automatically deserializes using either format, so make sure to upgrade consumer applications and sink connectors before changing source connector converters to use the number format.

See KIP-481 for more details.

What’s New in Kafka Streams

KIP-213: Support non-key joining in KTable

Previously, the Streams domain-specific language (DSL) only allowed table-table joins based on the primary key of the joining KTables. Now, for a KTable (left input) to join with another KTable (right input) based on a specified foreign key as part of its value fields, the join result is a new KTable keyed on the left KTable’s original key. This supports updates from both sides of the join.

See KIP-213 for more details.

KIP-307: Allow to define custom processor names with KStreams DSL

Prior to this release, while building a new topology through the Kafka Streams DSL, the processors were automatically named. A complex topology with dozens of operations can be hard to understand if the processor names are not relevant. This KIP allows users to set more meaningful processor names.

See KIP-307 for more details.

KIP-470: TopologyTestDriver test input and output usability improvements

The TopologyTestDriver allows you to test Kafka Streams logic. This is a lot faster than utilizing actual producers and consumers and makes it possible to simulate different timing scenarios. Kafka 2.4.0 introduces TestInputTopic and TestOutputTopic classes to simplify the test interface.

See KIP-470 and this blog post for more details.

Metrics, monitoring, and operational improvements

  • KIP-412 adds support to dynamically alter a broker’s log levels using the Admin API.
  • KIP-495 allows users to dynamically alter log levels in the Connect framework.
  • KIP-521 changes Connect to also send log messages to a file and rolls that file every day.
  • KIP-460 modifies the PreferredLeaderElection RPC to support unclean leader election in addition to preferred leader election.
  • KIP-464 allows you to leverage num.partitions and default.replication.factor from the AdminClient#createTopics API.
  • KIP-492 supports the security provider config, which can be used to configure custom security algorithms.
  • KIP-496 adds an API to delete consumer offsets and expose it via the AdminClient.
  • KIP-503 adds metrics to monitor the number of topics/replicas marked for deletion.
  • KIP-475 adds metrics to measure the number of tasks on a connector.
  • KIP-471 exposes a subset of RocksDB’s statistics in Kafka Streams metrics, which enables users to find bottlenecks and tune RocksDB accordingly.
  • KIP-484 adds new metrics for the group and transaction metadata loading duration.
  • KIP-444 adds a few new metrics at the Streams instance level such as static version/commit-id as well as dynamic state.

ZooKeeper upgrade to 3.5.x

ZooKeeper has been upgraded to 3.5.x. support for TLS encryption added in ZooKeeper 3.5.x. This enables us to configure TLS encryption between Kafka brokers and ZooKeeper.

Scala 2.13 support

Apache Kafka 2.4.0 now supports Scala 2.13 while also remaining compatible with Scala 2.12 and 2.11.

Conclusion

We want to take this opportunity to thank everyone who has contributed to this release!

To learn more about what’s new in Apache Kafka 2.4, be sure to check out the release notes and highlights video below.

This post was originally published by Manikumar Reddy on The Apache Software Foundation blog.

  • Manikumar Reddy is a committer for the Apache Kafka project and a software engineer on the Core Kafka team at Confluent. Prior to Confluent, he was a software engineer at Hortonworks, where he was part of the Streams engineering team. When he is not working, Manikumar likes gardening, reading books on investing and behavioral finance, and spending his time with his family.

Did you like this blog post? Share it now