Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Preparing the Consumer Fetch: Kafka Producer and Consumer Internals, Part 3

Written By

Welcome back to the third installment of our blog series where we’re diving into the beautiful black box that is Apache Kafka® to better understand how we interact with the cluster through producer and consumer clients.

Earlier in the series, we took a look at the Kafka producer to see how the client works before following a produce request as it’s processed by the cluster.

In this post, we’ll switch our attention to Kafka Consumer clients to see how consumers interact with the brokers, coordinate their partitions, and send requests to read data from your Kafka topics.

If you’re looking for more information on the rest of the process, check out the other blog posts in the series: 

  • How Producers Work: See what the producer does behind the scenes to help prepare your raw event data for the broker.

  • Handling the Producer Request: Learn how your data goes from the producing client all the way to disk on the broker.

  • Preparing the Consumer Fetch: See how consumers issue fetch requests.

  • Handling the Consumer Fetch: Dive into the inner workings of the brokers as they attempt to serve data up to a consumer.

Preparing the consumer fetch

There’s no time to waste. Let’s set up our consumer and start to fetch some data! But which data? 

As a recap, we’re reading some adventurous data that outlines the whereabouts of hobbits from a hobbit-updates topic with 6 partitions. Here’s what our events look like:

{
  "doc": "Accounting for the whereabouts and current   
          activities of hobbits.",
  "fields": [
    {
      "doc": "Name of the hobbit in question.",
      "name": "hobbit_name",
      "type": "string"
    },
    {
      "doc": "Current location of the hobbit.",
      "name": "location",
      "type": "string"
    },
    {
      "doc": "Current status of the hobbit.",
      "name": "status",
      "type": {
        "name": "Status",
        "type": "enum",
        "symbols": ["EATING", "NAPPING", "SMOKING",
                    "ADVENTURING", "THIEVING"]
      }
    }
  ],
  "name": "hobbitUpdate",
  "type": "record"
}

Consuming an event from this topic is a solved problem, which we already know. All you do is set up a consumer client using your favorite programming language, call consumer.poll(), and bam…there’s your data!

But it’s not that simple. Well, it is and it isn’t.

Similar to the Kafka producer, the Kafka consumer has many hoops to jump through to get data from the Kafka cluster. It’s a long journey, fraught with dangers and hurdles across the cluster.

But to even begin our journey and start to poll for data, we first need to set up a consumer and configure it to read from our hobbit-updates topic.

On the consumer

Partitions

Once connected to the brokers, the consumer’s first task is to determine which topics and partitions to consume from. 

If there’s just one consumer, it is a simple process––the consumer is assigned all the partitions to read from its subscribed topics. But if that’s not the case, there are a couple configurations that impact how partitions are assigned.

Configuring partitions

At the very least, you tell your consumer which topic(s) to subscribe to, but it won’t always read every partition from that topic, especially if it's a part of a consumer group. Being part of a consumer group means that a set of consumers can band together to parallelize the processing of data from different partitions in a single Kafka topic. 

We enable consumer groups by setting the configuration parameter group.id to the same string on multiple consumers. With group.id set, the consumer will connect with the consumer group coordinator, which is another broker on the cluster that coordinates all of the consumer groups.

Note that if you call Consumer.subscribe(topic) or try to make use of Kafka’s built-in offset management, you need to set group.id.

Should you use consumer groups, another important configuration to keep in mind is partition.assignment.strategy; the partition assignment strategy is how the consumer group coordinator decides how to split up the subscribe partitions across consumers in the group. There are a few assignors available to you:

  • RangeAssignor: This assignor lays out the partitions from potentially multiple topics in order from 0 to n and tries to collocate partitions with the same numbers on the same consumer. The best use case for this is when you have multiple topics that use the same key partitioning strategy and you’d like to use your consumer to join those streams.

  • RoundRobinAssignor: This assignor is fairly straightforward. It looks at all of the consumers in the consumer group and all of the subscribed topic-partitions. Then it doles out topic-partitions one by one as it rotates through the group of consumers.

  • StickyAssignor: To understand this assignor, you have to understand what happens when consumers join or leave a consumer group. Basically, the world is stopped, assigned partitions are revoked, new assignments are made and doled out, and the consumers resume. That’s fine, but depending on your use case, it might be better if unaffected consumer assignments are left as-is. The StickyAssignor will only shift a partition to a new consumer if the existing consumer was added or removed.

  • CooperativeStickyAssignor: In a similar vein, this assignor uses cooperative rebalancing as opposed to eager rebalancing. The bottom line is that cooperative rebalancing (also called incremental rebalancing) avoids the stop-the-world situation where no consumers are processing at all. It does this by incrementally revoking and assigning affected partitions so that unaffected consumers can continue to process the assignments that they have.

Monitoring partitions

Now, what if we wanted to look into partitioning and make sure our consumer group is working as it should? Don’t worry, there are metrics for that!

The following are all consumer-level metrics but only when you have a group.id enabled:

  • assigned-partitions: First, we want to monitor the number of assigned partitions per client.

  • rebalance-latency-avg: During the rebalance process, it’s a good idea to keep tabs on how long the entire group’s rebalance takes.

  • rebalance-total: And finally, it’s worth knowing how many times the rebalances have occurred.

You definitely want the last two metrics to be lower, of course, to show that your group is stable. In addition to these, there are a ton more metrics for consumer groups; we encourage you to check them out.

The partition assignment

Let’s make this a little more real now. Remember that our hobbit-updates has 6 partitions. And we can imagine that maybe we have a handful of consumers working together to process and make sense of hobbit whereabouts over time.

For visual learners, let’s get an idea of how this partition assignment might look for our use case.

If we’re using, say, the round-robin or sticky partitioner, the partitions of the underlying hobbit-updates topic might be distributed to the running consumers like this. For our sake moving forward, we can assume we’re Consumer 0, and we would be responsible for reading from partitions 0 and 3.

Offsets

Even though we know the topic-partitions that our particular consumer will read from, we’re not ready to read yet…we still need to know which offsets to start from in each topic-partition.

Now, if you’re unfamiliar, offsets are kind of like a bookmark for our Kafka topics. They uniquely define a message’s place in a topic-partition so that a consumer knows where they’ve left off, just in case. So, how does this consumer who just received a partition assignment know what their offsets are?

The fetch before the fetch

Easy! The consumer issues a fetch request to get the relevant offsets from the internal __consumer_offsets topic where they’re stored on the Kafka cluster.

But what if there aren’t any offsets in the offsets topic? Or if the offsets are invalid? That’s where the configuration parameter auto.offset.reset comes into play. Only when there’s an invalid offset––invalid can also mean that the offset doesn’t exist––auto.offset.reset tells the consumer where in the Kafka topic to start reading from. The available options are from the beginning with `earliest` or the end with `latest`. (Or None if you just want the consumer to crash and burn or if you plan to catch the exception manually and seek to a specific offset yourself.)

In terms of metrics related to offsets, while your consumer is up and running, you may want to monitor consumer lag through consumer-lag-offsets. In this case, lag means the difference between the last stored offset on the broker and the last committed offset from the consumer group per partition.

Sending a fetch request

Finally, we have everything we need to fetch the data from the hobbit-updates topic with a request to the broker. How does this happen?

As noted above, the consumer maintains socket connections with some number of Kafka brokers based on where the topic-partitions they’re consuming from reside as well as the broker that’s serving as the consumer group coordinator, and they send fetch requests using binary protocol over TCP.

So it’s request-response. Consumers send a request to fetch data. Brokers send a response back to the consumers with the result of that request––hopefully with some data for it to process. It’s this request process that really kicks off the first true phase of our journey. 

Configuring consumer requests

Before you get ahead of yourself, there are some configurations to keep in mind here, as well:

  • fetch.min.bytes and fetch.max.bytes and max.partition.fetch.bytes: These control at a high level how much data is going to be returned by the fetch request, but know that the max isn’t a hard maximum––which we’ll clarify later. max.partition.fetch.bytes comes into play as there could be multiple partitions-worth of data returned as part of this request. Of these three limits, the minimum fetch threshold is actually the most important, as this can hold up how long the broker will take to fulfill a request. 

  • fetch.max.wait.ms: This is how long the server will block and wait to reach fetch.min.bytes before moving on. You want this wait limit to be less than the overall request timeout which is outlined by the request.timeout.ms.

  • request.timeout.ms: The upper time limit for the request.

Monitoring consumer requests

Before we move on to the request lifecycle, it’s worth noting that there are some metrics related to how the request is handled by the consumer.

  • request-rate: The number of requests being made per second by the consumer.

  • fetch-latency-avg: Once the fetch requests are issued to the broker, a timer starts. It doesn’t stop until the consumer receives a response. This metric describes the time it takes for that to happen.

  • fetch-size-avg: This describes the average amount of data being returned per fetch.

The request

With our partition assignment and starting offsets known, every fetch request from Consumer 0 would look a bit like this:

Each request has some metadata associated with it so that the broker knows the limits that this request is operating within as far as the fetch thresholds and the overall timeout that the consumer has. 

Note that each request is directed toward a single node or broker in the cluster. So the assumption here is that partitions 0 and 3 from the hobbit-updates topic live on the same broker (along with the data from partition 1 of the elf-updates topic). If these topic-partitions lived on different brokers, the consumer would issue multiple requests per iteration of its poll loop.

And off we go!

Now, we’re ready to embark on the journey you’ve been waiting for. Let’s send this request off into the ether and let the brokers handle it.

Check out the final installment of the series to see what adventures await our consumer fetch request!

  • Danica began her career as a software engineer in data visualization and warehousing with a business intelligence team where she served as a point-person for standards and best practices in data visualization across her company. In 2018, Danica moved to San Francisco and pivoted to backend engineering with a derivatives data team which was responsible for building and maintaining the infrastructure that processes millions of financial market data per second in near real-time. Her first project on this team involved Kafka Streams – she never looked back. Danica now works as a Developer Advocate with Confluent where she helps others get the most out of their event-driven pipelines.

    Outside of work, Danica is passionate about sustainability, increasing diversity in the technical community, and keeping her many houseplants alive. She can be found on Twitter, tweeting about tech, plants, and baking @TheDanicaFine.

Did you like this blog post? Share it now