Confluent
Apache Kafka Rebalance Protocol for the Cloud: Static Membership
Stream Processing

Apache Kafka Rebalance Protocol for the Cloud: Static Membership

Boyang Chen

Static Membership is an enhancement to the current rebalance protocol that aims to reduce the downtime caused by excessive and unnecessary rebalances for general Apache Kafka® client implementations. This applies to Kafka consumers, Kafka Connect, and Kafka Streams. To get a better grasp on the rebalance protocol, we’ll examine this concept in depth and explain what it means. If you already know what a Kafka rebalance is, feel free to jump directly to the following section to save time: When do we trigger an unnecessary rebalance?

What does “rebalance” mean when it comes to Kafka?

A Kafka rebalance is a distributed protocol for client-side applications to process a common set of resources in a dynamic group. Two primary goals for this protocol are:

  1. Group resource assignment
  2. Membership change capture

Take a Kafka consumer, for example. A group of Kafka consumers read input data from Kafka through subscriptions, and topic partitions are their shared unit of tasks. Three consumers (C1, C2, and C3), two topics (T1 and T2) with three partitions each, and subscriptions would appear as follows:

C1: T1, T2
C2: T2
C3: T1

The rebalance protocol ensures that C1 and C2 take non-overlapping assignments from topic T2*, and the same goes for C1 and C3 from T1. A valid assignment looks like this:

C1: t1-p1, t2-p1
C2: t2-p2, t2-p3
C3: t1-p2, t1-p3

*Note that the consumer does not check if the assignment returned from the assignor respects these rules. If your customized assignor assigns partitions to multiple owners, it would still be silently accepted and cause double fetching. Strictly speaking, only built-in rebalance assignors obey this rule for resource isolation

However, the assignment below is not allowed, as it introduces overlapping assignments:

C1: t1-p1, t2-p1
C2: t2-p1, t2-p2, t2-p3
C3: t1-p2, t1-p3

The rebalance protocol also needs to properly handle membership changes. For the above case, if a new member C4 subscribing to T2 joins, the rebalance protocol will try to adjust the load within the group:

C1: t1-p1, t2-p1
C2: t2-p3
C3: t1-p2, t1-p3
C4: t2-p2

In summary, the rebalance protocol needs to “balance” the load within a client group as it scales, while making the task ownership safe at the same time. Similar to most distributed consensus algorithms, Kafka takes a two-phase approach. For simplicity, we’ll stick to the Kafka consumer for now.

Consumer rebalance demo

The endpoint that consumers commit progress to is called a group coordinator, which is hosted on a designated broker. It also serves as the centralized manager of group rebalances. When the group starts rebalancing, the group coordinator first switches its state to rebalance so that all interacting consumers are notified to rejoin the group. Until all the members rejoin or the coordinator waits long enough and reaches the rebalance timeout, the group proceeds to another stage called sync, which officially announces the formation of a valid consumer group. To distinguish members who fall out of the group during this process, each successful rebalance increments a counter called generation ID and propagates its value to all the joined members, so that out-of-generation members can be fenced.

In the sync stage, the group coordinator replies to all members with the latest generation information. Specifically, it nominates one of the members as the leader and replies to the leader with encoded membership and subscription metadata.

The leader shall complete the assignment based on membership and topic metadata information, and reply to the coordinator with the assignment information. During this period, all the followers are required to send a sync group request to get their actual assignments and go into a wait pool until the leader finishes transmitting the assignment to the coordinator. Upon receiving the assignment, the coordinator transitions the group from sync to stable. All pending and upcoming follower sync requests will be answered with individual assignment.

Here, we describe two demo cases: one is an actual rebalance walkthrough, and the other is the high-level state machine. Note that in the sync stage, we can always fall back to rebalance mode if rebalance conditions are triggered, such as adding a new member, topic partition expansion, etc.

Rebalance Demo

State Machine View: Two-Phase Protocol

The rebalance protocol is very effective at balancing task processing load in real time and letting users freely scale their applications, but it is a rather heavy operation as well, requiring the entire consumer group to stop working temporarily. Members are expected to revoke ongoing assignments and initialize new assignments at the start and end of each rebalance. Such operations take overhead, especially for stateful operations where the task needs to first restore a local state from its backup topic before serving.

Essentially, a rebalance kicks in when following conditions are met:

  1. Group membership changes, such as a new member joining
  2. Member subscription changes, such as one consumer changing the subscribed topics
  3. Resource changes, such as adding more partitions to the subscribed topic

When do we trigger an unnecessary rebalance?

In the real world, there are many scenarios where a group coordinator triggers unnecessary rebalances that are detrimental to application performance. The first case is transient member timeout. To understand this, we need to first introduce two concepts: consumer heartbeat and session timeout.

Consumer heartbeat and session timeout

A Kafka consumer maintains a background thread to periodically send heartbeat requests to the coordinator to indicate its liveness. The consumer configuration called session.timeout.ms defines how long the coordinator waits after the member’s last heartbeat before it assuming the member failed. When this value is set too low, a network jitter or a long garbage collection (GC) might fail the liveness check, causing the group coordinator to remove this member and begin rebalancing. The solution is simple: instead of using the default 10-second session timeout, set it to a larger value to greatly reduce transient failure-caused rebalances.

Note that the longer you set the session timeout to, the longer partial unavailability you will have when a consumer actually fails. We will explain how to choose this value in a later section on how to opt into Static Membership.

Rolling bounce procedure

From time to time, we need to restart our application, deploy new code, or perform a rollback, etc. These operations in the worst case may cause a lot of rebalances. When a consumer instance shuts down, it sends a leave group request to the group coordinator, letting itself be removed from the group and triggering another rebalance afterwards. When that consumer resumes after a bounce, it sends a join group request to the group coordinator, triggering another rebalance.

During a rolling bounce procedure, consecutive rebalances are triggered as instances that are shut down and resumed, and partitions are reassigned back and forth. The final assignment result is purely random and incurs a large cost to pay for task shuffling and reinitialization.

How about letting members choose not to leave the group? Not an option either. To understand why, we need to talk about the member ID for a moment.

Consumer member ID

When a new member joins the group, the request contains no membership information. The group coordinator will assign a universally unique identifier (UUID) to this member as its member ID, put the ID in the cache, and embed this information in its response to the member. Within this consumer’s lifecycle, it could reuse the same member ID without the coordinator triggering a rebalance when it rejoins, except in edge cases such as leader rejoining.

Going back to the rolling bounce scenario, a restarted member will erase in-memory membership information and rejoin the group without member ID or generation ID. Since the rejoining consumer would be recognized as a completely new member of the group, the group coordinator does not guarantee that its old assignment will be assigned back. As you can see, a member leaving the group is not the root cause for unnecessary task shuffling—the loss of identity is.

What is Static Membership?

Static Membership, unlike Dynamic Membership, aims to persist member identity across multiple generations of the group. The goal here is to reuse the same subscription information and make the old members “recognizable” to the coordinator. Static Membership introduces a new consumer configuration called group.instance.id, which is configured by users to uniquely identify their consumer instances. Although the coordinator-assigned member ID gets lost during restart, the coordinator will still recognize this member based on the provided group instance ID in the join request. Therefore, the same assignment is guaranteed.

Static Membership is extremely friendly with cloud application setups, because nowadays deployment technologies such as Kubernetes are very self-contained for managing the health of applications. To heal a dead or ill-performing consumer, Kubernetes could easily bring down the relevant instance and spin up a new one using the same instance ID. With a cloud management framework, the group coordinator’s client health check is ongoing.

Below is a quick demo of how Static Membership works.

Static Membership Demo

How to opt into Static Membership

Since the Apache Kafka 2.3 release, Static Membership has become generally available for the community. Here are the instructions if you want to be an alpha user:

  1. Upgrade your broker to 2.3 or higher. Specifically, you need to upgrade inter.broker.protocol.version to 2.3 or higher in order to enable this feature.
  2. On the client side:

    • Upgrade your client library to 2.3 or higher.
    • Define a longer and reasonable session timeout. As stated before, a tight session timeout value could make the group unstable as members are kicked out of it spuriously due to missing a single heartbeat. You should set the session timeout to a reasonable value based on the business tolerance of partial unavailability. For example, setting a session timeout to 10 minutes for a business that could tolerate 15 minutes of unavailability is reasonable, whereas setting it to five seconds is not.
    • Set the group.instance.id configuration to a unique ID for your consumer. If you are a Kafka Streams user, use the same configuration for your stream instance.
  3. Deploy the new code to your application. Static Membership will take effect in your next rolling bounce.

Static Membership only works as expected if these instructions are followed. We have nonetheless made some preventative efforts to reduce the potential risk of human error.

Error handling

Sometimes a user can forget to upgrade a broker. When the consumer first gets started, it acquires the API version of the designated broker. If the client is configured with group instance ID and the broker is on older version, the application will crash immediately as the broker has no support for Static Membership yet.

If a user fails to configure the group instance ID uniquely, meaning that there are two or more members configured with the same instance ID, a fencing logic comes into play. When a known static member rejoins without a member ID, the coordinator generates a new UUID to reply to this member as its new member ID. At the same time, the group coordinator maintains a mapping from the instance ID to the latest assigned member ID. If a known static member rejoins with a valid member ID that doesn’t match with the cached ID, it immediately gets fenced by the coordinator response. This eliminates the risk of concurrent processing for duplicate static members.

In this very first version, we expect bugs that may invalidate the processing semantics or hinder the fencing logic. Some of them have been addressed in the trunk, such as KAFKA-8715, and we are still actively working on finding more issues.

Feedback is really appreciated! If you detect any issues with Static Membership, please file a JIRA or put a question on the dev mailing list to get our attention.

Want to know more? Come to Kafka Summit!

There are still many details we haven’t covered in this blog post, like how this effort compares with incremental rebalancing, how Static Membership helps with a non-sticky assignment strategy, and tooling support around the new protocol. We will be covering all this and more in our upcoming Kafka Summit talk, Static Membership: Rebalance Strategy Designed for the Cloud, on October 1st at 4:55 pm. PT, so hurry if you haven’t registered yet! As a bonus, you can register with the code blog19 to get 30% off.

This work has been ongoing for over a year, with many iterations and huge support from my colleagues, past colleagues, and community friends. I owe a big thank you to all of you, especially Guozhang Wang, Jason Gustafson, Liquan Pei, Yu Yang, Shawn Nguyen, Matthias J. Sax, John Roesler, Mayuresh Gharat, Dong Lin, and Mike Freyberger.

Boyang Chen is an infrastructure engineer at Confluent, where he works on the Kafka Streams Team to build the next-generation event streaming platform on top of Apache Kafka. Previously, Boyang worked at Pinterest as a software engineer on the Ads Infrastructure Team, where he tackled various ads real-time challenges and rebuilt the whole budgeting and pacing pipeline, making it fast and robust with concrete revenue gain and business impact.

Subscribe to the Confluent Blog

Subscribe

More Articles Like This

Derivative Event Sourcing
Anna McDonald

Introducing Derivative Event Sourcing

Anna McDonald .

First, what is event sourcing? Here’s an example. Consider your bank account: viewing it online, the first thing you notice is often the current balance. How many of us drill ...

Friendship Graph
David Allen

Using Graph Processing for Kafka Stream Visualizations

David Allen .

We know that Apache Kafka® is great when you’re dealing with streams, allowing you to conveniently look at streams as tables. Stream processing engines like KSQL furthermore give you the ...

Top 10 Reasons to Attend Kafka Summit
Tim Berglund

Top 10 Reasons to Attend Kafka Summit

Tim Berglund .

Yes, the other definition of event sourcing. 1. Keynotes from leading technologists At Kafka Summit SF, you’ll get to hear incredible keynotes from leading technologists, including Jay Kreps and Neha ...

Try Confluent Platform

Download Now

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.