Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Handling the Producer Request: Kafka Producer and Consumer Internals, Part 2

Written By

Welcome to the second installment of our blog series to understand the inner workings of the beautiful black box that is Apache Kafka®. 

We’re diving headfirst into Kafka to see how we actually interact with the cluster through producers and consumers. Along the way, we explore the configurations that affect each step of this epic journey and the metrics that we can use to more effectively monitor the process. 

In the last blog, we explored what the Kafka producer client does behind the scenes each time we call producer.send() (or similar, depending on your language of choice). In this post, we follow our brave hero, a well-formed produce request, that’s on its way to the broker to be processed and have its data stored on the cluster.

For more information on the rest of the process, check out the other blog posts in the series: 

  • How Producers Work: See what the producer does behind the scenes to help prepare your raw event data for the broker.

  • Handling the Producer Request: Learn how your data goes from the producing client all the way to disk on the broker.

  • Preparing the Consumer Fetch: See how consumers issue fetch requests.

  • Handling the Consumer Fetch: Dive into the inner workings of the brokers as they attempt to serve data up to a consumer.

Handling the producer request

On the broker

Socket receive buffer

The first stage for incoming requests on the broker is the server socket receive buffer. It’s a sort of landing zone for the incoming data; here the request awaits being picked up by the network threads for processing.

There are some low-level configuration parameters available to you to tweak the performance of this buffer, like the size of the overall buffer (socket.receive.buffer.bytes) and the maximum size of an incoming request (socket.request.max.bytes). Generally, you won’t need to stray away from the defaults, and you should consider having a conversation with your Kafka cluster administrator if you think any of these should be changed.

Network threads

After a short stay in the socket receive buffer, the request is picked up by an available network thread from the pool. An important thing to note is that the network thread that picks up the incoming request is the thread that will handle the request for the entirety of its lifecycle.

The network thread’s first job is to read the request from the socket buffer, form it into a produce request object (as opposed to a consumer fetch request), and add it to the request queue.

Again, there’s an advanced configuration available at this point; generally, you won’t have to change it, but it’s good to know. num.network.threads defines how many network threads will be working at this point. The default is 3, and the upper bound is usually just the number of cores available on the server.

A good way to monitor these threads is to use NetworkProcessorAvgIdlePercent. The values range from 0 (meaning the threads are fully utilized) to 1 (meaning the threads aren’t busy). You want this to be closer to 1 to show that the threads aren’t working too hard.

Request queue

Another short stint for our hero, er, our request, is the request queue where it will await further processing by the server I/O threads.

At this point, you have control of the number of requests (queued.max.requests) and the maximum size of the requests (queued.max.request.bytes). If you want to change it, a general rule of thumb is to cap queued.max.requests at the number of active clients that you have; this helps to ensure strict ordering of your messages.

You can monitor the size of the request queue as well as the amount of time that a request resides in the queue using RequestQueueSize and RequestQueueTimeMs, respectively. These metrics together are a proxy for how your I/O threads are performing as well as if your broker is overwhelmed. And it’s good to keep tabs on those two metrics because, if the request queue is full for any reason, it will block and prevent the network threads from adding any new requests.

I/O (request handler) threads

Next, the request is picked up off of the request queue by an available I/O thread, also known as a request handler thread.

When the thread accesses the request, its first job is to validate the data contained within it using a cyclic redundancy check. This is just an extra mechanism to ensure that the integrity of the data hasn’t been compromised accidentally while those bytes have been shipped around.

Now, since the I/O threads have quite a bit of work to do in handling the data and writing it to disk, you may want to configure how many there are. The default is 8, but you can change the number using num.io.threads.

If you want to monitor how the threads are doing, you can use RequestHandlerAvgIdlePercent. Again, you ideally want this to be closer to 1 to show that your threads are idle more often than not. A value closer to 0 means that your threads are rarely idle and are working hard.

Page cache and disk

Finally, the moment you’ve been waiting for: Our hero is deep into his epic journey and faces the ultimate task… writing the data to disk. And actually, the I/O threads take care of this part of the journey.

As a quick aside, let’s think about the structure of storage in Kafka. It’s a log––specifically, a commit log. On disk, the commit log consists of a collection of components called segments. Each segment, in turn, is made up of a couple of files:

  • The first is a .log file which contains the actual event data, the raw bytes, that we want to store. 

  • The second is an .index file, which is critical. It stores the index structure that maps from a record offset to the position of that record in the corresponding .log file. 

  • We also have .timeindex for accessing records using time-based offsets; this is used in disaster recovery scenarios for consumers. 

  • And finally, a .snapshot file stores the producer sequence numbers, which comes in handy for idempotence.

Now, that’s quite a few files for the I/O threads to write to. And we all know how expensive writes can be. So, when the I/O threads write the events to the .log and .index files, for efficiency, the updates happen in page cache. Later on, they can be flushed to disk. 

This might be scary to some folks who would prefer to see Kafka use fsyncs, but, as my colleague Jack Vanlightly put it in his blog on the subject, “Kafka’s data replication protocol was designed to be safe without fsyncs.” On top of that, you do have control of how often the files are flushed to disk as well as some other useful configurations:

  • log.flush.interval.ms: Defines a time-based limit for how often the .log file is flushed to disk.

  • log.flush.interval.messages: This is how many messages can be written to the .log file before the file is flushed to disk.

  • log.segment.bytes: This is the maximum size of the .log file before the files roll over to the next segment. 

Additionally, there are configurations that can be overridden at the topic level. A common one to set at the topic level is cleanup.policy which can take values `delete` and `compact`. The former means that old log entries will be deleted after a specified time or when a file size threshold has been breached, while the latter means that at least one value per key will be retained with old entries for the key being deleted.

If you’re curious, the best way to monitor how your logs are flushing to disk is to use  LogFlushRateAndTimeMs. Another good metric to monitor is LocalTimeMs, which is the amount of time that passes between when the I/O threads pick up the request and the data is written to page cache.

It should be obvious by now that the I/O threads are the true MVPs in this writing process. Therefore, it’s in our best interest that the I/O threads do their job of writing the data as much as possible so that they can move on and write more data. But is page cache enough to ensure that our data is safe and sound?

Generally speaking, yes. But you still might want to ensure your data has been replicated—acks, anyone? And that’s why we have…

Purgatory

No, no, you read that correctly.

We saw in the previous stage that data isn’t synchronously written to disk from page cache, so it’s really important that Kafka use replication to ensure that our data is durably stored.

So, where does purgatory come in? Purgatory is a map-like data structure based on a hierarchical timing wheel where the requests are held until the replication process has completed. Remember acks from Part 1 in this blog series? By default, Kafka won’t send a response back to the producer until the data has been replicated to all the brokers. 

Your cluster administrator can set a cluster-wide replication factor using default.replication.factor, but you can override that for an individual topic using replication.factor

You may be wondering what exactly happens while the request is hanging out in purgatory. The answer: Not much. The broker that the request is hanging out in is waiting for the other brokers who own the follower replicas to request the data and bring their copies up to date. You see, every broker knows which of its topic-partitions are replicas and which brokers in the cluster own the golden copies for those replicas. They can then issue a fetch request from the brokers containing the golden copies and update their replicas.

By default, the follower brokers will fetch this information every 500 ms, but this is configurable with replica.fetch.wait.max.ms. You also have the ability to configure how many threads the follower brokers have devoted to fetching data using num.replica.fetchers.

For everyone’s sanity and the health of your cluster, it’s a good idea to monitor how long this replication process takes––i.e., how long the request hangs out in purgatory––using RemoteTimeMs.

Response queue

Once the initial request’s acks quote is fulfilled, the broker will take the request out of purgatory and begin building up a response to send back to the producer client. The response’s first stop is the response queue.

We can’t configure the response queue like we can the incoming request queue, but we can monitor it with similar metrics, like the number of responses in the queue (ResponseQueueSize) and the time spent in the queue (ResponseQueueTimeMS).

Network thread handoff

With the response ready to go, it’s back to the network threads!

Well, technically, the network thread that picked up the initial request has been handling everything up until now. At this point, its final task is to pick up the generated response object and send it back to the producer client.

Socket send buffer

And, finally, the outgoing response is placed on the socket send buffer to await receipt by the producer. In fact, the broker will wait until the entire response is received by the producer before moving on.

With socket.send.buffer.bytes, your administrator can control the overall size of this buffer. And it’s always a good idea to monitor how long this send process takes; you can keep tabs on that with ResponseSendTimeMs.

Back to the producer

And before we know it, we’re back on the producer client with a response––hopefully a good one!

First thing’s first, the producer will close out any timers and collect metrics on the request; for example, with TotalTimeMs, we can learn how long the entire request took to process. 

Then, if the data was stored successfully, the producer client will do a final bit of housekeeping and release the batch where it was stored temporarily in buffer memory. If the data wasn’t successfully stored, the producer will follow its configured retry protocol and potentially send the data again.

Phew!

And there you have it: the producer request lifecycle. <sarcasm> As simple as can be! </sarcasm>

We know it’s a long process with quite a lot of details to take in, but that’s what happens when you peel back the layers of a well-formed black box like Kafka. 

The reality is that Apache Kafka is a complex technology with a lot of nuances. This journey into its depths isn’t meant to overwhelm. Rather, it’s meant to help you understand the process and better equip you for the inevitable debugging session the next time something doesn’t quite go as planned.

For even more Kafka insights, check out Confluent Developer. And stick around for the next installment of the series where we follow a fetch request from the Kafka consumer to disk and back again.

  • Danica began her career as a software engineer in data visualization and warehousing with a business intelligence team where she served as a point-person for standards and best practices in data visualization across her company. In 2018, Danica moved to San Francisco and pivoted to backend engineering with a derivatives data team which was responsible for building and maintaining the infrastructure that processes millions of financial market data per second in near real-time. Her first project on this team involved Kafka Streams – she never looked back. Danica now works as a Developer Advocate with Confluent where she helps others get the most out of their event-driven pipelines.

    Outside of work, Danica is passionate about sustainability, increasing diversity in the technical community, and keeping her many houseplants alive. She can be found on Twitter, tweeting about tech, plants, and baking @TheDanicaFine.

Did you like this blog post? Share it now