Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now
Various factors can impede an organization's ability to leverage Confluent Cloud, ranging from data locality considerations to stringent internal prerequisites. For instance, specific mandates might dictate that data be confined within a customer's Virtual Private Cloud (VPC), or necessitate operation within an air-gapped VPC. However, a silver lining exists even in such circumstances, as viable alternatives remain available to address these specific scenarios.
Confluent Platform simplifies the process of connecting your data sources to Apache Kafka®, building streaming applications, as well as securing, monitoring, and managing your Kafka infrastructure. When deployed on a cloud provider like Amazon Web Services, you can use data streaming abilities faster without having a high level of expertise with Apache Kafka. That said, using AWS resources correctly to ensure stability, performance, and cost efficiency can pose a challenge. Let’s take a look at the best practices to follow for deploying, monitoring, and managing your Apache Kafka cluster in AWS. We’ll cover:
Before deploying your Kafka clusters, delve into the following crucial aspects that address a range of factors, including hardware and network infrastructure, scalability, and performance requirements. By exploring these considerations, you can ensure a robust and optimized Kafka deployment that aligns with specific use cases, minimizes potential challenges, and maximizes the benefits of this powerful data streaming platform.
If you are self-managing Kafka, it’s helpful to familiarize yourself with KIP-500 and KRaft. KRaft mode, short for Kafka Raft Metadata mode, uses the Raft consensus algorithm and will enable the retirement of ZooKeeper in Apache Kafka. Instead of storing cluster state and metadata in a separate service, KRaft does it the Kafka way and stores metadata within a Kafka metadata topic. This welcomed change greatly simplifies Kafka’s architecture by consolidating responsibility for metadata into Kafka itself. You can expect to see improvements in the performance, scalability, and operability of Kafka. KRaft is now production ready and generally available in Confluent Platform 7.4! If you’d like to learn more about why ZooKeeper was replaced, check out Guozhang Wang’s blog post Why ZooKeeper Was Replaced with KRaft – The Log of All Logs for a deep dive into the topic.
When you deploy Apache Kafka through Confluent Platform via Confluent for Kubernetes (CFK), you enable rack awareness in CFK and Kubernetes Availability Zones (AZs) are treated as racks. CFK maps each Kafka rack to an AZ and places Kafka partition replicas across AZs. CFK is based on the Operator API of Kubernetes which has self-healing capabilities at the cluster level. This means that it restarts containers that fail, reschedules pods when nodes die, and even kills containers that don't respond to your user-defined health checks.
A good starting point for a highly available topic is to set the replication factor (RF) to three and the minimum in-sync replicas (min ISR) to two. However, this might not be enough for some applications. If a broker crashes then neither of the other brokers with replicas can be restarted without affecting producers with acks=all. Setting RF=4 and min ISR=3 would avoid this but consequently creates an increase in producer latency and lower throughput. (Side note: Avoid setting RF and min ISR to the same value because producers with acks=all will be blocked if any replicating brokers go down.)
One way you could address this high availability dilemma is by deploying Kafka clusters with multi-availability zone configurations. A multi-availability zone cluster offers stronger fault tolerance as it distributes brokers across multiple availability zones within an AWS Region. This means a failed availability zone won’t cause Kafka downtime. The minimal cluster that supports high availability is with three brokers, each in a different AZ, and where topics have a replication factor of three and a min ISR of two. This will permit a single broker to be down without affecting producers with acks=all. Having fewer brokers will sacrifice either availability or durability.
You might wonder, why not take it a step further and spread brokers across regions for the ultimate solution to high availability? If your use case calls for it, go for it. Confluent Platform provides guidance on Multi-Region clusters (also known as Stretch Clusters) Keep in mind, Kafka was designed to run within a single data center. As such there are some caveats, for example, the data centers must have less than 100ms of latency between them. Operations such as EOS transaction commits can become quite talkative, causing delays of several seconds each if the latency between data centers is significant. If your data replication factor (RF) is set high to guarantee data presence in multiple regions – a typical customer requirement – then setting acknowledgments to 'all' can also lead to slow performance. Moreover, excessive latency among cluster components might trigger unusual states due to heartbeat timeouts, as seen in scenarios involving ZK nodes. Check out this. However, we do recommend “stretching” brokers in a single cluster across availability zones within the same region.
Until KRaft becomes mainstream, ZooKeeper should be distributed across multiple availability zones as well, to increase fault tolerance. To tolerate an availability zone failure, ZooKeeper must be running in at least three different availability zones. In a configuration where three ZooKeepers are running in only two availability zones, if the availability zone with two ZooKeepers fails, ZooKeeper will not have a quorum and will not be available.
For more details, please reference the architecture diagram above and this article on Building a Global, Highly Available Service Discovery Infrastructure with ZooKeeper.
When you deploy Kafka on AWS EC2 machines, you can configure storage in two primary ways: Elastic Block Storage (EBS) and instance storage. EBS consists of attaching a disk to an instance over a local network, whereas instance storage is directly attached to the instance. The log.dirs configuration specifies where the broker stores its logs (topic partition replicas), however, each has its tradeoffs that may or may not be ideal for your application and operational needs. Below we’ll go through each option and their associated tradeoffs.
EBS volumes provide consistent levels of I/O performance (IOPS) and ultimate flexibility in their deployment. This is important when accounting for Kafka's built-in fault tolerance of replicating partitions across a configurable number of brokers. When a broker fails and a new replacement broker is added, the replacement broker fetches all data the original broker previously stored from other brokers in the cluster that hosts the other replicas. Depending on your application, this could involve copying tens of gigabytes or terabytes of data. Fetching this data takes time and increases network traffic, which could impact the performance of the Kafka cluster for the period the data transfer is happening. A properly designed Kafka cluster based on EBS storage can virtually eliminate re-replication overhead that would be triggered by an instance failure, as the EBS volumes can be reassigned to a new instance quickly and easily.
For example, when an underlying EC2 instance fails or is terminated, the broker’s on-disk partition replicas remain intact and can be mounted by a new EC2 instance. By using EBS, most of the replica data for the replacement broker will already be in the EBS volume and hence won’t need to be transferred over the network. Only the data produced since the original broker failed or was terminated will need to be fetched across the network. To preserve the EBS root volume when an instance terminates, change the DeleteOnTermination attribute for the root volume to False. We also recommend setting automation scripts that leverage libraries like the Boto3 library from AWS, in combination with EC2 auto-scaling group feature to proactively setup CloudWatch alarms for StatusCheckFailed_System, when the instance has a failure scenario it will be rebooted, preserving the attached EBS volume.
AWS’s automatic recovery improves instance availability by recovering the instance if it becomes impaired due to an underlying hardware issue. Automatic recovery migrates the instance to another hardware during an instance reboot while retaining its instance ID, private IP addresses, Elastic IP addresses, and all instance metadata.
In short, a Kafka cluster deployed against EBS storage can be shut down cleanly without risk of data loss; an option that is not available with EC2 local instance storage since they are ephemeral.
Most Kafka deployments use a replication factor of three. However, EBS does its own replication under the covers for fault tolerance. As such, using a Kafka replication factor of two can save storage costs. However, keep in mind that although the data is stored in a fault-tolerant way, if two Kafka brokers go down, the partitions they each store will be offline.
Using EBS volumes will decrease network traffic when a broker fails or is replaced. Also, the replacement broker will join the cluster more quickly. However, EBS volumes add cost to your AWS deployment and are recommended for smaller Kafka clusters.
You can use instance storage as a more cost-effective option, but keep in mind that it takes longer and requires more network traffic to recover from a failed or terminated broker. In general, it’s recommended that you use instance storage for larger Kafka clusters when traffic is distributed across several brokers, so the failure of a broker has less of an impact. Storage-optimized instances like h1, i3, and d2 are an ideal choice for distributed applications like Kafka.
In a simple Kafka benchmark, we saw better performance with gp2/gp3 EBS than instance storage. As the performance of Kafka on gp3 eclipses that of its x86/gp2 counterpart in several repeated trials, we recommend using gp3 if message throughput is your primary requirement. Gp3 disks are provisioned with IOPS and throughput. By default, a gp3 disk comes with 3000 IOPS and 125 MBps capacity out-of-the-box, which can come at a price when provisioned for high capacity. This is slightly different from gp2 disks, the throughput is throttled at 250 MBps and the IOPS number is derived from the disk size.
Regardless of what type of storage you decide to use, the recommended practice when replacing a Kafka broker is to use the broker ID from the failed broker in the replacement broker. This is the most operationally simple approach because the replacement broker will resume the work that the failed broker was doing automatically. Alternatively, configuring Kafka for self-balancing automates your resource workload balancing, provides failure detection and self-healing, and allows you to add or decommission brokers as needed, with no manual tuning required.
You can limit disk thrashing and increase read/write throughput by configuring Kafka brokers with dedicated disks. More specifically, log.dirs should only contain disks (or EBS volumes) that are dedicated to Kafka. We suggest not including the operating system’s disk or any other disks that are used for other purposes.
The choice of instance types is generally driven by the type of storage required for your streaming applications on a Kafka cluster. If your application requires ephemeral instance storage, h1, i3, and d2 instances are generally preferred.
We recommend using d2.xlarge if you’re using instance storage or r4.xlarge if you’re using EBS. These instance types are in line with our general Kafka hardware recommendations.
This isn’t specific to Kafka, but you might want to consider buying reserved instances to lower
costs. You may also evaluate m5.large and m5.4xlarge which provide balanced compute, memory, and networking resources for a wide range of use cases, and can be leveraged for large kafka workloads.
You can also consider configuring EC2’s partition placement groups, that allow you to deploy distributed Kafka workloads within a single zone and reduce the likelihood of correlated failures, improving your application performance and availability. To reduce the impact of hardware faults, EC2 subdivides each partition placement group into logical segments called partitions and EC2 ensures that no two partitions within a placement group share the same racks. You can find more information about this in the blog post Using partition placement groups for large distributed and replicated workloads in Amazon EC2.
EBS has the potential to be a single point of failure. If the broad EBS service has an outage, all of your brokers will be affected. To avoid this, we recommend that you deploy Kafka clusters running in AWS EKS using GP2 and GP3 volume types. EBS volume types, such as gp3, io2, and st1 allow you to have varying throughput and IOPS characteristics for existing volumes.
Apache Kafka is frequently deployed on the Kubernetes (k8s) container management system, which is used to automate the deployment, scaling, and operation of containers across clusters of hosts. Since Kafka consumers rely on persistent storage to be able to retrieve this data, Kafka is considered a stateful application in the context of Kubernetes. Kubernetes natively exposes abstractions for managing stateful applications (e.g. StatefulSets), You can run Kafka on Amazon Elastic Kubernetes Service (EKS).
We recommend that you have three availability zones (AZ) in each region that you use to establish the rack designations. A total of three racks in each region with at least one K8 node in each rack and configure at least one broker in each AZ. A minimum of three Zookeepers with at least one ZooKeeper in each region. Notice that two observers and four replicas are placed across the three regions. Topic replica placement allows us to establish this designation. Setting up ZooKeeper quorum is critical for creating stretched clusters in K8s, typically because in k8s it's crucial to know how many ZooKeeper pods are there, and the naming convention will help with knowing what each of the ZooKeeper or Kafka pods resolve to.
K8s configures the volume type in storage classes and the volume type in a storage class is an immutable field. So if you want to change the disk size, you can edit the PVC and the storage controller will issue the AWS API call to change the disk size, but in the case of storage type, k8s does not have a direct way to change it just by editing the storage class.
Confluent offers its own implementation of the Kubernetes Operator API for automated provisioning, management, and operations of Kafka on Kubernetes, called Confluent for Kubernetes (CFK) which is a cloud-native control plane for deploying and managing Confluent in your private cloud environment. It provides a standard and simple interface to customize, deploy, and manage Confluent Platform through declarative API.
The diagram below details the high-level architecture of CFK and Confluent Platform in Kubernetes.
Our Recommendations for Deploying Apache Kafka on Kubernetes white paper is a helpful resource for understanding how Apache Kafka from Confluent Platform components fit into the Kubernetes ecosystem. It also covers our approach to networking, storage, traffic, log aggregation, metrics, and more.
AWS network configuration with Kafka is similar to other distributed systems, with a few caveats we’ll get into below. AWS offers a variety of different IP and DNS options. Choose an option that keeps inter-broker network traffic on the private subnet and allows clients to connect to the brokers. Note that inter-broker and client communication use the same network interface and port. We recommend reading the EC2 instance IP addressing documentation to learn more.
When a broker is started, it registers its hostname with ZooKeeper. The producer and the consumer are configured with a bootstrapped (or “starting”) list of Kafka brokers. In both cases, the client makes a request to fetch all broker hostnames and begin interacting with the cluster.
Depending on how the operating system’s hostname and network are configured, brokers on EC2 instances may register hostnames with ZooKeeper that aren’t reachable by clients. The purpose of advertised.host.name is to address exactly this problem; the configured hostname in advertised.host.name is registered with ZooKeeper instead of the operating system’s hostname.
In a secure Kafka cluster, use advertised.listeners instead. advertised.host.name is not supported using security (SSL and/or SASL).
In a multi-region architecture, careful consideration has to be made for Replicator and Cluster Linking. Under the covers, these replication tools are simply a consumer and a producer joined together.
If a replicator is configured to consume from an Elastic IP (EIP), the single broker tied to the EIP will be reachable, but the other brokers in the source cluster won’t be. It needs access to all brokers in the source and destination region, which in most cases is best implemented with a VPN between regions. For more details please refer to Connecting Multiple VPCs with EC2 Instances in the AWS documentation.
Client service discovery can be implemented in a number of different ways. One option is to use HAProxy on each client machine, proxying localhost requests to an available broker. Synapse works well for this. Another option is to use Route 53 for DNS. In this setup, the TTL must be set in the client JVM to get around DNS caching. See setting the JVM TTL in the AWS documentation. Another option is to use an Elastic Load Balancer (ELB). In this configuration, ensure the ELB is not public to the internet. Sessions and stickiness do not need to be configured because Kafka clients only make a request to the load balancer at startup.
Each step of this data streaming via Kafka journey requires that a decision be made. For example, the broker authenticates the client to make sure the message is actually originating from the configured producer. Likewise, the producer verifies that it has a secure connection to the broker before sending any messages. Then before the leader broker writes to its log, it makes sure that the producer is authorized to write to the desired topic. This check also applies to the consumer – it must be authorized to read from the topic.
If your choice of storage is EBS, you can enable encryption at rest by using Amazon EBS volumes with encryption enabled. Amazon EBS uses AWS Key Management Service (AWS KMS) for encryption. For more details, see Amazon EBS Encryption in the EBS documentation. For instance store–backed EC2 instances, you can enable encryption at rest by using Amazon EC2 instance store encryption.
Kafka can leverage TLS for client and internode communications. With two-way TLS/SSL authentication, you would create a client keystore and sign all certificates with the CA that you generated, similarly as done for the brokers.
Authentication of connections to brokers from clients (producers and consumers) to other brokers and tools uses either Secure Sockets Layer (SSL) or Simple Authentication and Security Layer (SASL). Kafka also supports Kerberos authentication. For production deployments of Kafka SASL/GSSAPI (Kerberos) or SASL/SCRAM is recommended.
Apache Kafka ships with a pluggable, out-of-the-box Authorizer implementation that uses Apache ZooKeeper to store all the ACLs. It is important to set ACLs because otherwise access to resources is limited to super users when an Authorizer is configured. The default behavior is that if a resource has no associated ACLs, then no one is allowed to access the resource, except super users.
It’s important to monitor broker performance and terminate poorly performing brokers as needed. Individual EC2 instance performance can decrease unexpectedly over time for unknown reasons. We recommend terminating and replacing a broker if the 99 percentile of produce/fetch request latency is higher than is tolerable for your application.
Confluent Platform has intuitive GUIs for managing and monitoring Apache Kafka. These tools allow developers and operators to centrally manage and control key components of the platform, maintain and optimize cluster health, and use intelligent alerts to reduce downtime by identifying potential issues before they occur.
You can leverage several tools to monitor Kafka clusters on AWS, including DataDog, New Relic, Amazon CloudWatch, and AWS CloudTrail. Apache Kafka brokers and clients report many internal metrics. JMX is the default reporter, though you can add any pluggable reporter. Check out the guide Monitoring Kafka with JMX for more information on how to monitor your broker, producer, consumer, and ZooKeeper metrics using JMX.
The best way to backup a Kafka cluster is to set up a mirror for the cluster. Depending on your setup and requirements, this mirror may be in the same data center or in a remote one. For more details, see the Mirroring data between clusters section of the Running Kafka in Production documentation. When backing up a Kafka cluster based on instance storage it’s recommended to set up a second cluster and replicate messages using Replicator or Cluster Linking
Kafka’s backup strategy is heavily dependent on the original type of storage chosen in your cluster deployment. Its mirroring feature makes it possible to maintain a replica of an existing Kafka cluster. Depending on your setup and requirements, your backup cluster might be in the same AWS Region as your main cluster or in a different one.
For EBS-based deployments, it is recommended to create automatic snapshots of EBS volumes to back up volumes. EBS volumes can be created from these snapshots to restore. We recommend storing backup files in object storage stores like Amazon S3 and using that for restoration as necessary. The Backup and Restore Kafka Connect Amazon S3 Source connector reads data exported to S3 by the Amazon S3 Sink connector and publishes it back to an Apache Kafka Topic. Depending on the format and partition used to write the data to S3, this connector can write to the destination topic using the same partitions as the original messages exported to S3 and maintain the same message order.
For more information on how to back up in Kafka, see the Kafka documentation.
We discussed several deployment options and considerations for running Kafka in the AWS Cloud. If you enjoyed reading this article and want to dive deeper into the inner workings and associated cost dynamics of self-managing Kafka, check out our four-part series on uncovering infrastructure costs.
✍️ Editor's note: This blog post was originally published by Alex Loddengaard in 2016. The content has been updated to reflect the latest information and best practices on September 6, 2023 by Geetha Anne, Braeden Quirante, and Joseph Morais.
Get a high-level overview of source connector tuning: What can and cannot be tuned, and tuning methodology for any and all source connectors.
Setting up proactive, synthetic monitoring is critical for complex, distributed systems like Apache Kafka®, especially when deployed on Kubernetes and where the end-user experience is concerned, and is paramount for healthy real-time data pipelines...