There is a class of applications that cannot afford to be unavailable—for example, external-facing entry points into your organization. Typically, anything your customers interact with directly cannot go down. As we move toward an event-driven world, more and more organizations are integrating Apache Kafka® as a core dependency of these applications. If you are ingesting data and sending it somewhere for processing, then that somewhere is usually Kafka.
A distributed system with a high level of redundancy means that it is a strong candidate for the type of data offload that this type of application requires. Given the proper distribution of brokers across racks, availability zones, and datacenters, combined with the right replication settings and client acknowledgments, you can achieve a high level of reliability in the event of problems in lower-level failure domains (e.g., disks, servers, networking, and racks). Even so, in complex systems there are unforeseen failure modes, and the scope for human error is amplified by automation, which means that partial or complete cluster outage remains a low-likelihood but high-impact possibility.
Unfortunately, these types of outages do occur in the wild. The impact on applications designed based on the assumption that distributed systems cannot go down (or that if they do, then the outage will be short-lived) can be significant.
For the vast majority of applications, the outage of a system such as Kafka, which forms the foundation of intersystem communications, will be significant but can also be easily remedied. You can quickly recover a system into a working state with the right combination of operational understanding, monitoring, and processes. However, failures that are not detected and swiftly remediated can have a significant impact on applications. Data loss and application outages are possible side effects of a Kafka outage. You need to consider how your application will behave during this type of service interruption.
This blog post addresses some of the impact caused by these types of failures on applications that interact with Kafka and provides some options to handle extended outages.
Availability needs to be treated like security—a practice that is applied in depth. Before you start worrying about Kafka clusters going offline and how your applications will react, you need to have the basics covered. This means:
If you apply each of the practices above, this will significantly reduce the risk of cluster outages. If you are using Confluent Cloud, then we take care of the operational best practices around running the platform for you so you don’t have to worry about detecting issues and fixing them.
Once all of the above practices have been addressed, you should start to consider how your application will behave if something really unexpected goes wrong.
You should pay particular attention to the potential for extended outages if one or more of the following apply:
The applications that are primarily affected are those that respond to client requests. These include applications that provide web services, RPC interfaces, and some TCP-native application protocols. These applications are characterized by servicing an inflow of requests without the ability to control the flow rate. If you have a Kafka outage, then there is nowhere for these messages to go.
An application is in control of its rate of ingress if it polls a source of data. Applications of this type have a relatively simple response to managing downstream outages—they can stop consuming from the input source. Examples include applications that consume from message brokers, file systems, and databases.
When you take a close look at the polling examples above, you may notice that it is characteristic of the sort of work performed by a Kafka Connect source connector. The Connect framework ensures that these connectors shut down during a broker outage. It subsequently resumes work when connectivity is reestablished by redistributing connector tasks across the cluster’s worker nodes. Custom applications that take their input from the outside world and directly produce to Kafka need to take a more considered approach.
If you have an obligation that says the system will continue to be available, there is a high outage cost. Aside from the risk of significant business loss and reputational damage, in some cases, regulators may impose fines if your organization is subject to those types of obligations.
The order of business logic relative to sends determines the importance of outage handling and constraints of your design choices. If your application completes a business operation before it sends messages to Kafka upon its completion, then extra effort needs to be applied to prevent outages and build out support mechanisms as fail-safes. If Kafka is unavailable to send messages to, there is inconsistency between what the outside world and your organization believe to be true; therefore, you need to put some backup mechanism in place.
Financial exchange trading platforms serve as an example of the type of application that works within this model and is susceptible to broker outages. In this scenario, a buy order is matched to a sell order, and the application sends a confirmation to the third-party transacting system via a high-speed channel. After this occurs, a message is then sent to Kafka for downstream processing (trade clearing, market updates, etc.).
In the case of an outage, you have to ensure that these messages can be processed eventually. Keeping unsent messages around and retrying indefinitely in the hopes that the outage will rectify may eventually result in your application running out of memory. This is a crucial consideration in high-throughput applications.
If business functions are performed by systems downstream of Kafka, and the sending application only acts as an ingestion point, the situation is slightly more relaxed. If Kafka is unavailable to send messages to, then no external activity has taken place. For these systems, a Kafka outage might mean that you do not accept new transactions. In such a case, it may be reasonable to return an error message and allow the external third party to retry later. Retail applications typically fall into this category.
You need to consider what to do when applications can’t send regardless of the transaction model. The approach may be radically different depending on the value of the data and throughput levels and whether it is worth investing higher levels of effort to build supporting infrastructure. You can make any system more reliable, but there is a cutoff point beyond which the costs to do so rise significantly, and the effort is not worth it.
An outage is a situation where applications cannot publish into Kafka. Outages may partially or entirely prevent an inflow of data. The outage does not necessarily have to disable the whole Kafka cluster at the same time.
An outage occurs where:
min.insync.replicasis not achievable on sends where
acks=allor no leaders are available for partitions to which the producing application wants to send
A partial cluster failure that takes out a few partitions is enough over time to completely block a producing application from sending because the memory buffers in its client library fill up.
Extended outages are not defined by any particular time window but display specific characteristics:
Which one of these symptoms shows up first depends on the following client configuration settings:
When the client library determines that the maximum number of retries for a message was exceeded or the message times out, the application receives exceptions or errors on callback listeners. It is essential that you do not ignore these warnings and do something sensible with the failed messages. The errors will likely affect messages sent to some partitions but not others. Keep in mind that partitions have different brokers acting as leaders for them.
At some time into the outage (seconds or minutes depending on production throughput), your application threads will be blocked from sending and will start returning errors from the send operation. How this looks mechanically depends on the client library:
librdkafkawill return an error code. The application needs to poll for broker responses to free up space within the client library before trying to send again.
Is it possible to get around this through the use of in-memory queues to build a form of staged event-driven architecture (SEDA) in front of the sending code?
In such a scheme, one or more acceptor threads ingest requests, perform some processing, and enqueue them for a background thread to perform the send to the Kafka client library. This idea of separating processing stages in an application with queues is the basis for actor frameworks.
Unfortunately, in-memory queues are not a solution. To understand why, we need to talk about backpressure.
Let’s step back for a moment and define a buffer: It is an area that temporarily holds data. Some processing units (for want of a better description) write data into this area, while others read and remove it. Within an application, a buffer is a data structure and the processing units are the threads of execution. A queue on a conventional message broker is also a buffer; the processing units are applications that produce to and consume from it.
A buffer behaves much like a water bottle with a tap on the bottom of it.
A bottle has a fixed capacity. Only a finite amount of water can go into it. However, a bottle with a tap is a dynamic system—you can regulate the amount of water going in (the production rate). You can also drain water from the bottle at different speeds by opening the bottom tap to various degrees (the consumption rate).
Buffers trend toward two different states over time:
production rate <= consumption rate, then the buffer will trend toward empty; the bottle empties faster than it fills.
production rate > consumption rate, then the buffer will trend toward full; the bottle fills up despite a small amount of water leaving. Eventually, it will overflow, and you won’t be able to add more water if the outflow is blocked nor add water at the same rate if it is merely low.
This overflow of water should give you some sense of how backpressure works. If a send process writes to a buffer that is being consumed at a slower pace, then the buffer will eventually fill up. If the send process is getting data from an upstream application, it will reach capacity and push back on the upstream system.
This is what happens during extended outages. The application sends messages into the Kafka client library, while at the same time, the background thread that performs the send to the broker is unable to release any of that data. The bottom tap is closed while the flow of water into the bottle continues.
Let’s go back to the idea of adding another buffer to the system and separating the acceptance of inbound requests from the send into the Kafka client library.
Using the bottle analogy, this is the equivalent of introducing another bottle with a tap above the initial bottle with a tap and running a hose between the two bottles.
If the bottom bottle is not drained quickly enough, it will eventually fill up, and the pressure will go back up the hose and proceed to fill up the top water bottle, which will eventually overflow.
Only by ensuring that the consumption rate trends toward being faster than the production rate can you ensure consistent flow in a dynamic system in the long term. It can rise and fall with variable input, but the water level (a work in progress) should trend down over time. You can enlarge the buffers to deal with a short outage, but that is only a short-term solution.
You should always be careful when introducing buffers. Applications that hold data inside an in-memory buffer can crash while waiting for the outflow to open again. This is the equivalent of breaking the bottle before it can release its contents.
Applications that send into a message broker need to be able to make a distinction between two types of error and react correspondingly:
You can deal with failed transient sends in several ways:
Any messages that time out and are resent into the Kafka producer API, either via an external retry or a side channel, will lose their original send order. That is always true for messages retried by the application unless it performs synchronous single-message sends, which is not an option for anything other than low-volume applications because it is too slow.
Writing messages to local storage is a component of a number of the options discussed here; however, it is not as simple as writing the messages to the end of a file.
You need to address the following concerns in writing failed/timed-out messages to local storage:
fsync()function. The latency of the I/O pipeline limits the performance of this operation, which the application calls synchronously.
You can achieve higher throughput with lower durability by using asynchronous disk APIs, such as
liburing. The frequency at which writes should occur to the local physical disks needs to be considered alongside the amount of data that resides in various application buffers that have not yet been written and would be lost if the system had a power outage.
These considerations are at the heart of the functionality of a message broker. You should consider writing messages to a broker that is co-located with the application rather than reimplementing the logic within your application. This form of co-location may be problematic for applications whose CPU and memory usage are highly optimized.
The implementation of KIP-500 provides a new design option consisting of a single-broker cluster that you can co-locate with your application as an ingestion point, and from there, mirror the affected topics to a central cluster via Cluster Linking. These types of hub-and-spoke architectures are already common in certain geographically distributed use cases; the use of individual brokers as spokes opens up new applications due to a smaller footprint.
This strategy is more generally known as load shedding. When a callback handler receives a transient error related to a sent message, it removes associated data from upstream data structures and performs no further actions. The application may log the event.
The situation should be externally visible to operators via monitoring.
The application resends timed-out messages to the client library. At some point, dictated by unsent messages or something similar, the application closes itself off to further inbound traffic until messages start to flow again.
Applications that poll for their inputs can stop polling, resulting in the same effect.
[A, B, C]. When message
Atimes out, is resent, and subsequently accepted, the messages will appear on the topic in the order
[B, C, A].
The application sends all messages to alternative local storage. A mechanism such as a Kafka Connect connector then ingests these into Kafka asynchronously.
This option uses local storage as a dead letter channel. A batch process imports these messages into Kafka once the system recovers to a normal state.
The system disables data flow to Kafka on failure and reroutes failed messages to local storage. The application plays back these messages from local storage and resends when Kafka recovers. Regular flow is then resumed.
You typically use the circuit breaker pattern for synchronous interactions, such as a web service or database invocations that will fail quickly. Attempting to implement this pattern over the top of a Kafka client, which is an asynchronous library, needs to account for the following:
librdkafkastatistics). You should instead monitor the data error rate (e.g., errored delivery reports). The application should signal an alarm if this rate exceeds some threshold or if the number of outstanding messages remains too high. This approach covers all types of cluster failures without any specific knowledge of client internals.
The sending application writes messages twice. Applications consuming from the affected topics must do so from two clusters and discard previously seen messages.
We have seen similar schemes applied in systems requiring ordered ledgers, where applications send messages to two locations. Consumers do not deduplicate using the typical method (idempotent consumer) of keeping track of previously seen business keys that uniquely identify each message payload and discarding duplicates—a read- and write-intensive process. Instead, they use a technique based on sequencing via a monotonically incrementing counter stamped on each message by the sending process.
The consuming system pulls messages from both clusters and keeps track of the highest counter value. It discards messages with a counter that is equal to or lower than the highest counter previously seen. You can avoid modifying message payloads by storing these counters in a message header.
As part of any Kafka deployment and topic design, it is important to plan around common failures:
In a self-managed environment, you can address these through a combination of appropriate broker placement, replication settings, and redundancy.
Monitoring of your Kafka clusters is essential for detecting failures before they escalate into an incident. Not all problems can be spotted and remediated before they start affecting upstream applications. You must decide how your application will react when it cannot send messages during outages.
Availability is a service quality that needs to be considered in depth. By doing all the operational basics right, you significantly decrease the likelihood of outages at the platform level. If you are using Confluent Cloud, then we take care of the best practices around running the platform on your behalf.
This blog post covered several options for outage handling. The exact strategy that is right for you depends on your applications’ transaction model, the value of inbound data, and its throughput. These factors must be considered against the cost of implementing additional reliability mechanisms that go beyond those provided natively by Kafka and its client libraries.
If you would like to discuss reliability with people who deal with it daily, please contact Professional Services to learn more.