Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

How Producers Work: Kafka Producer and Consumer Internals, Part 1

Written By

I shouldn’t have to convince anyone that Apache Kafka® is an incredibly useful and powerful technology. As a distributed event streaming platform, it’s adept at storing your event data and serving it up for downstream consuming applications to make sense of that information––in real time or as close to real time as your use case permits. The real beauty of Kafka as a technology is that it can do it with very little effort on your part. In effect, it’s a black box.

Think about it. We write Kafka producers to ship data off to the black box where we expect it to magically be stored where it should. And then we write any number of Kafka consumers to poll data from the black box for as long as we need them to. Usually, that’s exactly what happens.

But what about the situations where the convenient little black box doesn’t quite do what you want it to do? What do you do when your Kafka producer doesn’t behave as you expect it to? What if your Kafka consumer isn’t getting any data? Where do you start to try to figure out why

I’ll be the first to admit that Kafka can be tricky to debug; when something goes wrong, most of the battle is knowing where to look in your logs and what (potentially unheard of) configuration parameter to tweak. That’s not fair.

So in this blog series, we’re going to dive headfirst into the inner workings of Kafka and see how we actually interact with the black box through producers and consumers. Specifically, we’ll see how client requests are handled by the brokers. Along the way, you’ll learn the configurations that affect each step of the journey and the metrics that you can use to monitor those steps. By the end of the series, you’ll be well-equipped to debug your applications (or your cluster) the next time the pesky black box doesn’t quite do what you expect it to.

This four-part series features:

  • How Producers Work: See what the producer does behind the scenes to help prepare your raw event data for the broker.

  • Handling the Producer Request: Learn how your data goes from the producing client all the way to disk on the broker.

  • Preparing the Consumer Fetch: See how consumers issue fetch requests.

  • Handling the Consumer Fetch: Dive into the inner workings of the brokers as they attempt to serve data up to a consumer.

How producers work

The journey from your Kafka producer all the way to the brokers is long and fraught with danger. Okay, not really. But there are a ton of steps that every piece of data has to go through in order to make it to disk.

To really understand how data moves from the producer clients all the way to the brokers, it’s best to follow an example. In this blog, we’ll produce some data and see what the producer does to help us prepare for this journey!

Setting the stage

We’re responsible developers, so we’ll first start with a schema. Any schema will do, but since we’re about to embark on a journey of our own, why not choose an equally adventurous schema? Perhaps one related to hobbits? They do adventure, you know.

Here’s a simple schema to help keep track of the whereabouts of hobbits:

{
  "doc": "Accounting for the whereabouts and current   
          activities of hobbits.",
  "fields": [
    {
      "doc": "Name of the hobbit in question.",
      "name": "hobbit_name",
      "type": "string"
    },
    {
      "doc": "Current location of the hobbit.",
      "name": "location",
      "type": "string"
    },
    {
      "doc": "Current status of the hobbit.",
      "name": "status",
      "type": {
        "name": "Status",
        "type": "enum",
        "symbols": ["EATING", "NAPPING", "SMOKING",
                    "ADVENTURING", "THIEVING"]
      }
    }
  ],
  "name": "hobbitUpdate",
  "type": "record"
}

And the next obvious step for someone wanting to produce data to Kafka is to create a topic in which to store that data. Let’s say we have a topic called hobbit-updates—original, I know. We can head over to Confluent Cloud and create that topic with the default 6 partitions and the delete cleanup policy, like so:

Don’t worry if you don’t quite understand all of these configurations just yet, they’re covered in more detail in the next article in this series.

With our schema and topic ready to go, we can finally start to produce some data.

Send it!

Fortunately, setting up a Kafka producer and writing events to Kafka is a solved problem. On Confluent Developer, there’s a Getting Started section with guides that cover many commonly used languages. Choose your favorite language, write a producer, send some events, and bam, suddenly you have data in your Kafka cluster. That’s the beauty of the black box, remember?

But that’s not really what this blog is about. This blog is about diving into the black box. It’s about understanding what happens in between the call to producer.send() (or some variation depending on which client you’re working with) and your data finally ending up on the broker.

So let’s rewind to the point when we call producer.send() and step through exactly what happens to that event.

On the producer client

Because Kafka is treated like a black box, it’s easy to ignore the million little intricate things that happen behind the scenes to ensure that your data goes where you want it to go. In the case of sending data to the brokers, the producer handles so much without you even knowing.

Serializing

We began with an event that we handed off to the producer. The first step in the process is for the producer to translate the event into bytes. More often than not, the event that you gave to the producer was formatted in a way that you preferred; for example, a message object, a JSON string, or something nice, programmatic or human-readable. But Kafka brokers don’t care about that nicely formatted object. Brokers speak in bytes. So the producer needs to serialize the data. 

Configuring serialization

To set up your producers to serialize data, you need to give it the right serializers. You’ve probably seen some of these configurations before:

  • key.serializer: This configuration allows you to point to a class that describes how to serialize the key of the message that you’re producing, but, typically, the key is a String as opposed to some more complex object.

  • value.serializer: This points to a class that describes how to serialize the value of the object you’re producing.

  • schema.registry.url: If your serializers are using Schema Registry, you’ll need to include a means for connecting to your Schema Registry instance.

  • schema.registry.basic.auth.user.info: You will also need information on how to authenticate with that Schema Registry.

Since we have an Avro schema for the hobbit-updates objects we’re writing, we need to implement a serializer. In Python––my language of choice––this means implementing functions that describe how to connect to Schema Registry, return the hobbit-updates schema string, and describe a hobbit updates-to-dictionary conversion.

By the end of this configuration process, our producer should be able to take our message object and convert it into a chunk of raw bytes to store on the broker.

Partitioning

With our raw bytes in hand, the producer has to decide where to send those bytes. To do so, it needs to determine which partition the data belongs to.

Configuring partitioning

As part of building up your Kafka message, you may have noticed that you have the ability to specify which partition to send the data to (for example, see Producer.produce() in the confluent-kafka-python package). Should you choose to manually determine the partition yourself, the producer will use that partition. 

Otherwise, the producer will compute it using its configured partitioning strategy which is controlled by a number of configuration parameters:

  • partitioner.class: The default functionality (when you specify None) is if a key exists for the message being produced, the producer will compute hash(key) modulo num_partitions. (But not all clients use the same hash algorithm, by default!) If there’s no key, it will use the Sticky Partitioning Strategy where it will send a chunk of messages to a random partition, then another random partition, and so on. The goal of the Sticky Partitioning Strategy is to evenly distribute the data across the partitions. Note that you’re also free to implement your own custom partitioning strategy; this is useful if you’re aware of any commonly used keys and want to avoid hot partitions.

  • partitioner.ignore.keys: This configuration is exactly what it sounds like. If true, keys are ignored for the purposes of partitioning; if false, they’re used. So if you really want your data to be uniformly distributed, but you still want to make use of keys, you should not set the partitioner class and set partitioner.ignore.keys=true.

  • partitioner.adaptive.partitioning.enable and partitioner.availability.timeout.ms: If your goal is to optimize sending data to brokers using the Sticky Partitioning Strategy, these two configurations will really help you out. When partitioner.adaptive.partitioning.enable=true, the producer will look at how fast the current set of brokers are responding to requests to store data and adapt to send more data to the faster brokers. In conjunction, we have partitioner.availability.timeout.ms, which says that if a request to a partition takes longer than this timeout, then that partition will be ignored, and data will be sent elsewhere.

At the end of this process, the producer knows which partition a message is destined for. Using some metadata that it receives from the cluster, it can easily figure out which broker owns that partition, and it can prepare to send the message to that broker.

Batching

By this point, the producer has a chunk of raw bytes, a partition, and a dream––to store that data on its rightful broker. But wait! What if we took the chunk of bytes and grouped it with other chunks of bytes destined for the same broker? Then we could be a little more efficient in our round-trip requests to the clusters, right? Sounds good to me. Let’s batch that data!

If the thought of batching data leaves you feeling icky, know that you’re not alone! Batching is a difficult task to get right with Kafka. There’s a lot of good that can come from batching properly, but that’s just it… you have to do it properly.

Configuring batching

There are a number of configurations that control how batching is done, but these three are the main ones:

  • batch.size: The most obvious thing you want to control is how large your batches are; the default is about 16 kilobytes. A single batch will only contain records destined for the same partition. Some other considerations:

    • If your records are larger than the batch size, then they won’t be batched. So you should know what your data looks like.

    • Along those lines, if this value is set to 0, then batching will be disabled.

    • If your batch size is small, then you’re going to have smaller batches sent to the broker more frequently. This means throughput may go down since you’re adding more round-trip time to make requests with fewer events. On the other hand, if batch size is larger, you can potentially increase throughput, but you also run the risk of using memory wastefully on the producer side.

    • batch.size is just an upper bound. We also have a time component that affects when we cut off adding records to a batch, and that’s handled by linger.ms.

  • linger.ms: This is how long a producer will potentially wait to fill a batch up to the batch.size limit. More to keep in mind here:

    • linger.ms is set to 0 by default which means that we won’t wait to accumulate records before sending them off, so batching is effectively disabled. In other words, changing batch.size is not enough to turn on batching.

    • If you want to batch your data for better throughput, know that you’re doing it at the expense of added latency. Whatever you choose for linger.ms will potentially add that much latency to your requests. That said, increasing linger.ms can also improve latency by reducing pressure on the brokers.

  • buffer.memory: This is an often overlooked, but still very important, batching configuration. While the producer is waiting and batching records, the data has to be stored somewhere. And buffer.memory tells the producer how much space to set aside to store the data. By default it’s roughly 32 megabytes. Obviously, buffer.memory should be larger than batch.size or you’re going to run into issues. 

Now you have the power to do batching and do it well by playing around with these values. At the same time, there’s something to be said for the defaults. Quite a bit of testing went into selecting the default values for each of these configuration parameters. If you’re going to change them for yourself, be prepared to conduct extensive testing––and at production scale––in order to convince yourself that you’ve assessed all of the trade-offs involved in configuring batching.

Monitoring batching

If you decide to enable batching and want to ensure that everything is going well, you have to keep tabs on some metrics. Here are the most important ones to monitor:

  • batch-size-avg: This metric is how large your batches actually are. If things are going well, this will be pretty close to batch.size. If batch-size-avg is consistently lower than the set batch size, then linger.ms may not be high enough. At the same time, if linger.ms is high and batches are still small, then it could be that records aren’t being produced quickly enough and you’re adding latency unnecessarily.

  • records-per-request-avg: This is the average number of records across batches per a request. If you want to batch, it’s a good sanity check.

  • record-size-avg: This is the average size of records within the batches; it will help you for tuning purposes. For example, if this value is close to or higher than the batch.size, then your producers aren’t even batching.

  • buffer-available-bytes: This will help you to keep tabs on the amount of memory you still have available on the producer. 

  • record-queue-time-avg: I haven’t said anything about a queue yet, so this might feel like it came out of nowhere. But while we’re building up a batch, the records are effectively hanging out in a queue. So this metric effectively shows how long we’re lingering to fill the batches before sending the records.

Compressing

Alright, we now have:

✅ Raw bytes

✅ Partition

✅ More raw bytes

That might be too many raw bytes. The next optional step for the producer is to compress the raw bytes that it wants to send along to the broker. 

By default, compression is disabled, but there are a handful of compression options available to you out of the box. Choose your compression method of choice and enable compression with the compression.type configuration parameter. I recommend z-standard as a starting point, but I also encourage you to check out the docs for more information if that’s something you want to enable.

For more advanced tuning, with the addition of KIP-390 in Apache Kafka version 3.8, you have the ability to choose the compression level for many of these methods with compression.[type].level.

Sending a request

We have what we need to send the data to the broker. So how does the producer do that?

Every producer maintains socket connections with some number of Kafka brokers. They then send requests using binary protocol over TCP.

So it’s a request-response model: Producers send a request to the brokers to store data; and brokers send a response back to the producers with the (hopefully positive) result of that request. It’s this request process that really kicks off the first true phase of our epic journey.

Configuring producer requests

Before you get ahead of yourself, there are some configurations to keep in mind here, as well:

  • max.request.size: The default maximum request size is about 1 megabyte. This is going to directly limit how many batches we can send in a given request. The brokers also have a limit on the maximum size of the request it can receive after compression.

  • acks: You’ve most certainly seen acks, or “acknowledgements” before. For high availability and fault tolerance in Kafka, you can optionally configure your topic-partitions to have replicas––additional copies stored across the cluster. Data is initially produced to a lead broker and then copied over to other brokers containing the replicas. So, in a very simplistic way, acks effectively answers the question, “How many in-sync replicas should we successfully write the data to before sending a response back to the producer?” The default is all of them, but you can throw caution to the wind and specify acks=0 or acks=1. (The number of in-sync replicas, in turn, can be changed with min.insync.replicas—the default is 1, so you may want to increase it for better durability.)

  • max.in.flight.requests.per.connection: Producers maintain connections to as many brokers as needed depending on where the topic-partitions they’re writing to reside. For each of those connections, there’s no sense in inundating the request queue on that broker, so it’s smart to set a limit. The default is 5. (Why 5? Because it happens to work for idempotence. ⤵️)

  • enable.idempotence and transactional.id: If you care about ordering your data or not losing data or not duplicating data, then you might care about enabling idempotence. When enable.idempotence=true (the default functionality), we enforce that acks=all, that producer retries are enabled, and max.in.flight.requests.per.connection=5. By doing so, the producer can ensure that idempotence is maintained for the lifetime of a producer session. If you need idempotence across producer sessions, then you need to use transactions. To start using transactions, you first need to set a transactional.id, and then you need to start and commit transactions within your client code.

  • request.timeout.ms: Finally, once a request is sent to the broker, request.timeout.ms comes into play. It’s the maximum amount of time that the producer will wait before optionally retrying to send the data or throwing an exception; it’s set to 30 seconds by default. Retries themselves can be configured and customized with delivery.timeout.ms (make sure this is larger than request.timeout.ms plus linger.ms!), retries, and retry.backoff.ms

Monitoring producer requests

Before we move on to the request lifecycle, it’s worth noting that there are some metrics related to how the request is handled by the producer.

  • request-rate: This generally describes the number of requests being made per second by the producer.

  • requests-in-flight: This is a per-producer metric that describes the number of requests currently waiting to be fulfilled by brokers. You can use this to see how bogged down brokers are or whether your producer is making an appropriate number of requests. Ideally, you’d want this value to be low to indicate that brokers are healthy.

  • request-latency-avg: Once the requests are issued to the broker, a timer starts. It doesn’t stop until the producer receives a response. This metric describes the time it takes for that to happen.

The request

We have some optionally compressed, batched records which have been grouped together by the topic-partition and broker that they’re destined for. And we might have multiple batches of records formed in this way. We can now take those batches and send them to the broker as part of a single producer request. 

It can be difficult to envision what this sort of nested object might look like, so for those visual learners out there, I present to you an example producer request:

Again, I’d like to highlight that this request is destined for a single broker in the cluster. But it might contain data for a number of different topics-partitions on that broker. It just depends on how your producers are operating and what sort of data they’re sending. Results may vary.

And off we go!

Now, we’re ready to embark on the journey you’ve been waiting for. Let’s send this request off into the ether and let the brokers handle it.

Check out the next installment of the series to see what adventures await!

  • Danica began her career as a software engineer in data visualization and warehousing with a business intelligence team where she served as a point-person for standards and best practices in data visualization across her company. In 2018, Danica moved to San Francisco and pivoted to backend engineering with a derivatives data team which was responsible for building and maintaining the infrastructure that processes millions of financial market data per second in near real-time. Her first project on this team involved Kafka Streams – she never looked back. Danica now works as a Developer Advocate with Confluent where she helps others get the most out of their event-driven pipelines.

    Outside of work, Danica is passionate about sustainability, increasing diversity in the technical community, and keeping her many houseplants alive. She can be found on Twitter, tweeting about tech, plants, and baking @TheDanicaFine.

Did you like this blog post? Share it now