As one of the world’s biggest internet-based platform companies, Tencent uses technology to enrich the lives of users and assist the digital upgrade of enterprises. An example product is the popular WeChat application, which has over one billion active users worldwide. The Platform and Content Group (PCG) is responsible for integrating Tencent’s internet, social, and content platforms. PCG promotes the cross-platform and multi-modal development of IP, with the overall goal of creating more diversified premium digital content experiences. Since its inception, many major products—from the well-known QQ, QZone, video, App Store, news, and browser, to relatively new members of the ecosystem such as live broadcast, anime, and movies—have been evolving on top of consolidated infrastructure and a set of foundational technology components.
At our center stands the real-time messaging system that connects data and computing. We have proudly built many essential data pipelines and messaging queues around Apache Kafka. Our application of Kafka is similar to other organizations: We build pipelines for cross-region log ingestion, machine learning platforms, and asynchronous communication among microservices. The unique challenges come from stretching our architecture along multiple dimensions, namely scalability, customization, and SLA. Here are some of the notable requirements:
Ideally, we need a multi-tenant, gigantic pub/sub system to satisfy all these requirements. At peak time, it should reliably support data transfer at hundreds of gigabits per second. It should be provisioned almost instantly without disrupting existing workload; it also needs to tolerate single-node and cluster failure. Considering interface concerns, we want it to be compatible with the Kafka SDK as much as possible. After exploring the limitations of a single Kafka cluster, we’ve moved forward with a series of developments.
We chose to develop in the Kafka ecosystem for its maturity, rich set of clients and connectors, as well as superb performance among alternatives. On the other hand, there are a few gaps in using Apache Kafka to meet the requirements above. For instance, more than expected, we found during heavy usage that multiple disk failures caused insufficient replica or even cluster-level reliability problems. Moreover, expanding the capacity of a cluster (i.e., adding brokers) requires significant data rebalancing, often imposing hours of operational latency. Without fully automated capacity management, this greatly limits how we can support a large business.
Given that we decided to focus our initial enhancement on scalability and failure tolerance, we started off building a proxy layer that federates multiple Kafka clusters and provides compatible interfaces to both providers and consumers. The proxy layer presents logical topics to Kafka clients and internally maps them to distinct physical topics in each Kafka cluster. In the figure below, a logical topic with eight partitions (
P0–P7) is distributed to two physical clusters each with four partitions (
The extra layer of abstraction of logical topics allows us to achieve the following, desirable behavior. First, we can expand the capacity of the data pipeline with little (re)synchronization overhead. In case two clusters at their maximum size cannot handle the predicted peak volume, we can easily deploy two additional clusters without shuffling any existing data. Second, fault tolerance is easier to manage with smaller clusters, as we can provision extra capacity at fine granularity and redirect traffic at a low cost. Lastly, in the (not-so-rare) event of physical cluster migration, the transparent proxy eliminates the need for any code and configuration change on the application side. We would only need to set the old clusters in read-only mode before it is completely drained, while associating the proxy with the new clusters. Such maintenance is not visible from the perspective of logic topics.
In this section, we get into more details of the new components we built and how they interact in essential scenarios, as shown in the figure below. Two proxy services, one for the producer (
pproxy) and another for the consumer (
cproxy), implement the core protocols of the Kafka broker. They are also responsible for mapping logical topics to their physical incarnation. The application uses the same Kafka SDK to connect directly to the proxy, which acts as a broker.
In order to address the set of proxy brokers, we built a lightweight name service that maintains this relationship between client ID and the collection of proxy servers. The SDK will request the list of proxy brokers using client ID once at the beginning of the communication. Internally, the most complicated and bulky part of our implementation involves managing the metadata of the federated cluster, including both the state of the topics as well as the lifecycle of the proxy nodes. We extract the logic of the Kafka controller node (such as topic metadata) into a separate service, which is also called “the controller,” but it is different from Kafka’s own controller functionality. This service is responsible for collecting the metadata of physical clusters, composing the partition information logical topics, and then publishing it to the proxy brokers.
We see some examples of interactions among these components and the Kafka clusters underneath during the most common operations:
pproxyfrom heartbeat message.
Over the past year, we have gradually onboard many products in Tencent PCG to use the federated Kafka solution. Alongside the cluster, we have also been developing better monitoring and automated management tools. Our design principles have been quickly validated by many critical business use cases such as real-time analytics, feature engineering, and more. Up to now, we have deployed a few hundreds of clusters of various sizes, which collectively handle more than 10 trillion messages every day. The following table summarizes our typical setup and operational benchmarks.
|Average time to initialize a federated cluster||10 minutes|
|Average time to scaling up a federated cluster
(add one physical cluster)
|Metadata refresh latency||~1 second|
|Maximum physical clusters per logical cluster||60|
|Brokers per physical cluster||10|
|Total number of brokers provisioned||~500|
|Max cluster bandwidth (CPU ~40%)||240 Gb/s|
|Proxy overhead||Same as Kafka broker|
There are two notable limitations of the first design composed of cluster federation with a proxy layer. First, the distribution of logical partitions is not transparent to clients who use a hash key to specify the partition when producing a message. Consequently, when we add a new cluster, messages with identical keys might be delivered to different partitions and hence get out of order. This did not turn out to be a blocker for our current use cases for two reasons. First, we surveyed the product teams and found that they only use keyed messages occasionally. Furthermore, when they face the trade-off between application-level fault tolerance and scalability, they typically prefer the latter and sometimes settle with a mechanism to briefly halt production when cluster membership is updated.
A more fundamental limitation is that we have to frequently evolve the interface of the proxy broker as more functionalities need to be exposed and as native Kafka evolves. This leads to unnecessary code duplication and makes the whole system harder to manage. In the future, we will explore implementing similar semantics inside Kafka, as described in the next section.
As we explained above, Tencent is one of the largest Kafka users in the world, processing trillions of messages every day. This also means that to power our many use cases, we have successfully pushed some of Kafka’s boundaries. We are aware of the ongoing development and proposals within the Kafka community, and we further find that some of our ideas, such as abstracting out the controller and shard in traffic across multiple controllers, are along the same lines as what the community is moving toward. We may see significant benefits if our federation solution integrates with new features like Tiered Storage (KIP-405) and ZooKeeper removal (KIP-500), and we look forward to contributing what has been proven to help Tencent back into the community.
In this article, we introduce our journey of building federated Kafka clusters for business use cases that require high scalability and fault tolerance. Through a strong engineering solution and investment in the operation, we are bridging the gap and supporting the continued growth of Tencent PCG. Our experience has reassured our confidence in the Kafka ecosystem and shed light on how we can further extend its capabilities. We will continue our development along this direction.
If you want more on Kafka and event streaming, you can check out Confluent Developer to find the largest collection of resources for getting started, including end-to-end Kafka tutorials, videos, demos, meetups, podcasts, and more.