Hands-on Workshop: ZooKeeper to KRaft Without the Hassle | Secure Your Spot
Consumer group IDs are a vital part of consumer configuration in Apache Kafka®. Setting the consumer Group ID determines what group a consumer belongs to, which has some major consequences. There are three areas in which Group IDs are particularly pertinent:
Detecting new data
Work sharing
Fault tolerance
If you'd like a quick overview of consumer group IDs check out the video below, or keep reading for more!
Kafka consumers read/consume data from Kafka producers, do the work of reading event streams. They read events, or messages, from logs called topics. Topics are further split into partitions, which are append-only logs that store the messages. This enables each topic to be hosted and replicated across a number of brokers.
As you can see in the diagram, a given consumer in a consumer group can read from multiple partitions, including multiple partitions housed in the same topic.
One of the more foundational concepts of consumer groups is that each consumer, once assigned to a group, shares the workload. When each consumer has the same Group ID, they cannot read from the same partition:
Now, you don’t always need to reach for a single consumer group. Sometimes, you may have two consumers (each in different consumer groups, allowing for parallel processing) for two different services, like a customer address service and a customer delivery notification service, that would need to read from the same partition in the same topic. These two consumers in different groups reading from the same topic can pick up reading from different offsets, which they would not be able to do if Kafka used a queue rather than a persistent log.
However, if you do want your consumers to read from the same topic, you need to consider a couple of things: 1) the number of consumers in your group and 2) the number of partitions. This consideration is important because the consumers will share the workload as equally as possible among themselves. Sometimes making this decision is easy—I’m writing a demo app in Kafka Streams with 1 consumer and 1 partition right now because I just need to show developers how to use a .process()
function in Kafka Streams. But in most use cases, this decision takes careful thought.
Considerations in favor of fewer consumers in relation to a high number of partitions include things like how high you want your consumer throughput to be since each partition is given one thread per consumer. You might also want room to increase parallelism later on.
Considerations against having fewer consumers to a higher number of partitions include things like higher unavailability in the case of unclean failure since leader election will take longer.
If you’re interested in reading more about consumer strategy in relation to partitions, Jun Rao has an excellent blog post on the subject.
The broker stores the progress of each consumer group in a metadata topic called consumer offsets. Offsets specify the location of a given event within a partition, and as such, represent progress through the topic. Like bookmarks or sticky tabs in textbooks, offsets in consumer groups serve the same purpose as how bookmarks or sticky tabs function in books.
You can use a particular Group ID’s offset to check whether there’s been new data written to the partition. If there’s an event with a larger offset, that means there’s new data to read. If you want to know how to read the offset, here’s a command using the kafka-consumer-groups
utility that will read your offsets:
Note that you need to provide a valid Group ID to --group
if you’re trying out this command. The output will resemble the following:
Or, if you want to learn more about how to do this with the Confluent CLI for a topic hosted in Confluent Cloud, you can check out this tutorial on reading from a specific offset and partition.
There’s more on the kafka-consumer-groups
utility in our documentation, and you can always run kafka-consumer-groups
—help
for a full list of the options.
For example, in Python it looks like the second line from the bottom in this configuration object:
The Group ID determines which consumers belong to which group. You can assign Group IDs via configuration when you create the consumer client.
For example, in Python it looks like the second line from the bottom in this configuration object:
If there are eight partitions, each of those four consumers will be assigned two partitions. What if there are nine partitions? That means the leftover partition will be assigned to the first consumer in the group so that one consumer reads from three partitions and the rest of the consumers read from two partitions. It’s the broker’s job to continually ensure that partitions are evenly distributed among the connected consumers.
This whole process is predicated on the presence of a Group ID to unify the consumers. It’s important to remember this while you’re setting up your consumers.
If you’re connecting microservices, you want to make sure that each service has its own consumer group (and hence its own Group ID). Why is that? Let’s walk through an example.
Let’s say there’s a topic “payments,” and both the “orders” microservice and the “refunds” microservice will need to read from that topic. You wouldn’t want them to share the same offsets, because if they did, the progress through the “payments” topic would be shared by “orders” and “refunds,” which would mean potential missed orders or refunds.
However, if you had a group of consumers handling “orders” by reading from partitions in the “payments” topic, then the current offset for each consumer in the group, stored in the broker, is vital to ensure continuous progress in case a consumer in the group crashes. At the same time, if consumers from another, separate group, like “refunds” are reading from the “payments” topic, they can continue their progress unaffected even if the consumers in the “orders” group are rebalancing.
As the last example revealed, Group IDs also play a vital role in fault tolerance.
Each consumer sends “heartbeat requests” to the broker at a set interval. If a consumer does not respond in time, a rebalance is triggered.
How does a Group ID play into rebalancing? Well, in either case, the broker’s record of the associated offset determines where the consumer will begin reading after a rejoin. As long as the Group ID remains the same, it can pick up exactly where it left off, without any risk of data loss.
When a new consumer joins a group, the first step is the broker’s recognition of the new consumer. In the second step, the broker adjusts the consumer group state from RUNNING
to PREPARE_REBALANCE
. Once the consumer group reacts to the change in state, the final state that the broker introduces is COMPLETING_REBALANCE
. (Afterwards, the state is restored to RUNNING
once more.)
Now, what pieces of information does the broker use to track all this? Well, in each type of consumer membership, there’s a member.id
that ensures a unique identity for each group member.
In a dynamic membership, at the point in a rebalance when a new consumer sends a JoinGroupRequest
with a special UNKNOWN_MEMBER_ID
, the broker receives that UNKNOWN_MEMBER_ID
, sees that consumer as a new member of the group, and generates a new ID for it.
During a client restart, this happens for all members of the group because when the members send their JoinGroupRequests
, an UNKNOWN_MEMBER_ID
is included. The member.id
is not persisted, so a rebalance is triggered because the broker reads these members as new members.
KIP-345 (which is a Kafka Improvement Proposal, introducing the concept of static membership to the Kafka community) notes:
“When the service state is heavy, a rebalance of one topic partition from instance A to B means a huge amount of data transfer.” — KIP-345
In this context, a “heavy” service state means that an app that has built up a large amount of local state—perhaps a large KTable
. If the partition is reassigned, then that state must be saved and transferred to a new node, and if there’s a lot of data, the process will be slow.
Now, this process can and should be avoided altogether if the partition assignment remains unchanged, or has the potential to remain unchanged, before and after the rebalance. But how to execute this task? This is where static membership comes into play.
What is static membership, and how does it solve the problem encountered by heavy-state applications with dynamic membership?
A consumer with static membership is a consumer with a configured group.instance.id
, in addition to its broker-managed member.id
. The group.instance.id
persists the identity of the consumer over a rebalance.
So, what would happen if a client with a group of consumers with static membership needed to restart? In this case, since the broker recognizes the group.instance.id
, it would not need to assign new membership, and a rebalance can be avoided. This makes heavy-state applications much more efficient, and it also makes rolling upgrades much faster. Since the consumers’ identity as part of the original groups is persisted by the group.instance.id
, they no longer have to rebalance on removal and re-entry into the group. The only lag involved will be from the restarted node.
To implement static membership in your consumer group, set the group.instance.id
value in your configuration. Here’s an example of what that might look like, using the node-rdkafka client:
group.instance.id
vs. group.id
To recap, in the example above, the consumer is instantiated with both a group.id
and a group.instance.id
. What’s the difference? A group.id
is what determines which group a consumer belongs to. You can read more about group IDs in our blog post Configuring Apache Kafka Group IDs.
Alternatively, a group.instance.id
should be different for each member of the consumer group, because it encapsulates the member’s identity as a consumer instance. The broker maps the group.instance.id
to each member.id to ensure each consumer’s unique identity. The member.id
serves as extra validation in the case of static membership.
In summary, when you set a consumer Group ID in the process of creating a consumer client, that Group ID assigns the consumer to its group, which has ramifications for work sharing, detecting new data, and data recovery. In addition, to avoid long rebalances when the service state is heavy, a piece of configuration for each group member called a `group.instance.id
` is available. To learn more about this and other topics, check out these recommended resources:
Confluent Developer: Learn Apache Kafka through Confluent Developer tutorials, documentation, courses, blog posts, and examples.
Confluent Community: If you have a question about Apache Kafka or you’d like to meet other Kafka developers, head over to Confluent Community and introduce yourself on our Community Slack or Forum.
Get Started with Confluent Cloud: You can learn more about how to get started with Confluent by joining this live webinar, hosted by Tim Berglund.
Apache®, Apache Kafka®, and Kafka® are registered trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by the Apache Software Foundation is implied by using these marks. All other trademarks are the property of their respective owners.
If you’ve used Kafka for any amount of time you’ve likely heard about connections; the most common place that they come up is in regard to clients. Sure, producer and consumer clients connect to the cluster to do their jobs, but it doesn’t stop there. Nearly all interactions across a cluster...
Apache Kafka® is an event streaming platform used by more than 30% of the Fortune 500 today. There are numerous features of Kafka that make it the de-facto standard for […]