Whether you’re a seasoned Apache Kafka® developer or just getting started you’re likely to hit a snag at some point or another—either in configuring and understanding your clients or setting up and monitoring things on the broker side. With all the moving pieces involved, it can sometimes be difficult to know exactly how to keep your Kafka pipeline running smoothly. To help you combat this, we’ve compiled a list of five common pitfalls and tips for how to avoid them—from client and broker configurations to design and monitoring considerations—that are sure to save you time and effort down the road.
From the client side:
From the broker side:
Let’s get started by looking at some of the common configuration mistakes users make on the client side of things.
request.timeout.ms is a client-side configuration that defines how long the client (both producer and consumer) will wait to receive a response from the broker. The default for this configuration is set to 30 seconds.
If a response is not received before the timeout is reached, then one of two things will happen: either the client will attempt to resend the request (if retries are enabled and have not yet been exhausted, see below for more details on this) or it will simply fail.
It might be tempting to set the request.timeout.ms to a lower value. After all, with a shorter timeout period, clients can react more quickly, whether that means reconnecting or even failing. However, whilst this might sound intuitive, it’s not always a good thing. If you’re not careful, you might exacerbate any problems on the broker side and result in worse performance for your application.
For example, if a broker is taking a long time to handle and process its incoming requests, a lower request.timeout.ms across client applications could lead to increased request pressure as the additional retry attempts are added to the broker’s request queue. This then exacerbates the ongoing performance impact on the brokers, adding to the pressure on it.
It’s recommended to leave request.timeout.ms at the default value of 30 seconds. As discussed above, a lower value could actually increase the amount of time it takes for server-side requests to be processed. Depending on your application needs and if you’re seeing these timeouts often, it may actually be useful to set request.timeout.ms to a higher value.
When executing producer.send(), the hope is that records go through and are successfully stored in a topic. The reality is that, for some reason or another, the producer request might fail. In certain cases, the failure is transient and retriable (i.e., the failure could be recovered from given a sufficient amount of time and the client retry of the request) whilst others will be permanent (i.e., something needs to be fixed before the request can succeed).
For example, during cluster rolling, some of the following retriable exceptions may be encountered by clients:
If retries and retry time are not configured properly, all of these exceptions will be logged as errors, which can potentially disrupt your client and result in lost messages.
A number of producer configurations can have an effect on whether retries are enabled and how they’re handled. Here are the producer configurations to check to ensure that retries are enabled and working as expected:
Why are these important? Bottom line: they can impact the ordering of messages on the topic, specifically when retries are enabled, max.in.flight.requests.per.connection is greater than 1, and enable.idempotence=false. Suppose a producer has sent two requests containing messages for the same partition to the broker. The first batch fails and the producer attempts to resend it while the second batch succeeds. That means that the second set of messages will appear on the partition before the messages in the first batch. If ordering is important to your use case and you want to make use of retries, avoid setting both max.in.flight.requests.per.connection greater than 1 and enable.idempotence=false.
Decide how you’d like your code to behave before you start playing around with the above producer configuration parameters. Do you want retries enabled at all? Check that the retries value is greater than 1 and acks is set to either 1 or all. Would you instead prefer that your producer fail fast and throw exceptions for all potential issues it encounters? Disable retries by setting retries=0. (Note that in this scenario, you still have the ability to handle exceptions on your own within the client code.)
Issues can easily arise when clients are interacting with the cluster, but what about the cluster, itself? It’s just as important to be aware of things that can go wrong with the brokers and how to mitigate them.
Kafka brokers expose a number of really useful JMX metrics that give you great insight into the overall health of your cluster. Unfortunately, not all cluster admins pay enough attention to them.
Some of these metrics are obvious and easy to understand and make use of while others may be a bit hairier to act on, so it’s important to look into and understand the metrics that are available to you. If you’re looking to focus on a handful of key metrics, the following five are a good place to start:
Broker request latency, or how quickly requests are being handled by the broker, is extremely important to overall cluster health. What’s more, there really isn’t any other proxy for request health other than enabling request logging (which is very verbose and may drastically impact performance), so metrics really are the way to go. The bottom line is that ignoring these metrics can have ramifications. Among others, slow response times come to mind.
If you didn’t know about JMX metrics or weren’t paying too much attention to them, you should begin to do so. Knowing they exist is half the battle, and now you can start to act on the metrics and make your cluster healthier and more resilient. The above example metrics are just the tip of the iceberg, and we encourage you to take a look at these additional resources:
After reading all of this, you may think to yourself: What good are broker metrics if I can’t see when a rogue client is abusing the cluster? That’s certainly important! We see your concern and raise you KIP 714 (at the time of this writing, it is under discussion). This KIP offers higher client observability from within the broker. How cool is that?
If the thought of monitoring all of these metrics is daunting, you should know that Confluent Cloud, our fully managed Kafka service, handles all of this for you. Latency, utilization, and a simplified cluster load metric are exposed by default for dedicated clusters.
Partitions are Kafka’s unit of parallelism—barring other factors such as consumer throughput, of course—so in order to increase your overall throughput, it would make sense to use as many partitions as possible, right? Well, not necessarily. A higher partition count may have a number of consequences in the cluster including but not limited to:
There are three different types of broker failover that can occur: clean, unclean, and one involving a failed controller. Thankfully, in a clean shutdown, the controller can be proactive about electing leader replicas and the downtime is relatively low. That being said, when the shutdown is unclean, the controller cannot be proactive, and all of the leader partitions on the failed broker will be unavailable at once while the controller determines the leader replicas. This becomes even worse in the event that the controller broker goes down; not only does a new controller need to be elected, it also has to initialize itself by reading metadata for every partition within ZooKeeper. In these last two cases, the time these partitions will be unavailable is directly proportional to the number of partitions in the cluster. Higher partition counts means greater downtime.
Another important point to note is that, if you’re using Kafka with ZooKeeper today, you’ll find the partition limit is around 4,000 per broker and 200,000 per cluster. With the move away from ZooKeeper as part of the KIP-500 implementation, this will change drastically. Since the cluster will no longer use ZooKeeper to store metadata regarding partitions and brokers, there will be greatly increased scalability. To put a number on this, tests showed that with the new metadata quorum controller a cluster could easily contain upwards of 2 million partitions.
So how do you avoid encountering these problems? Design topic partitions with care, and strike the balance between planned throughput and usage, and the downsides of over-provisioning. Whilst it’s possible to change the number of partitions on a topic it’s not always desirable to do so because of the impact it will have on message ordering.
The formula for determining the number of partitions per Kafka topic has been pretty well explored over time. When creating a new topic in your Kafka cluster, you should first think about your desired throughput (t) in MB/sec. Next, consider the producer throughput that you can achieve on a single partition (p)—this is affected by producer configurations but generally sits at roughly 10s of MB/sec. Finally, you need to determine the consumer throughput (c) you will have—this is application-dependent so you’ll have to measure it yourself. You should anticipate having at least max(t/p, t/c) partitions for that topic. So if you have a throughput requirement of 250 MB/sec, with a producer and consumer throughput of 50 MB/sec and 25 MB/sec respectively. Then you should use at least max(250/50, 250/25) = max(5, 10) = 10 partitions for that topic.
Whilst partitions are as low-level as the producer API gets, when it comes to storing these actual bytes on disk, Kafka splits each partition into segments. Each segment represents an actual data file on disk. Understanding how segments work and are configured is important to ensure that the broker behaves optimally.
As messages are written to the topic, the data is simply appended to the most recent open log segment file for the partition in which the message belongs. While a segment file remains open, it cannot be considered a candidate for deletion or log compaction. By default, these log segment files will remain open until they’re completely full (1GB) as per the topic-level segment.bytes configuration parameter. Instead, you may wish to force the segment to roll after a specific amount of time; this can be set using segment.ms, another topic-level configuration.
Some users will attempt to set segment.ms to a low value to help trigger log compaction or deletion more frequently, reducing the amount of memory their cluster will take up on disk. However, if segment.ms is configured to be too low (the default is seven days), your cluster will generate a lot of small segment files. With too many of these small segments, your Kafka cluster is liable to encounter a “Too many open files” or “Out of memory” exception. Furthermore, a large number of small or empty segment files can have a negative performance impact on consumers of the topic. During a fetch, consumers can only receive data from at most one segment per partition. So if the segments are very small, the consumers are limited in what they can consume at a given time and, as a result, will have to make more trips to the broker.
Unfortunately, to avoid this common pitfall, you need to be diligent about your configurations when creating and managing topics in your cluster. If a user sets segment.ms to a low value, it’s important to remember to set the value back to the default as soon as possible. After doing so, note that these changes are not retroactive, and previous segments will not be affected.
The five pitfalls covered in this article can be avoided by following these tips:
You can also easily avoid the broker-related pitfalls by using Confluent Cloud. With this fully managed Kafka service you can concentrate on writing your client applications instead of worrying about keeping the brokers running! Use the code CL60BLOG for an additional $60 of free usage.*
Here are some great resources to learn more about Kafka: