This post has been written in collaboration with Derrick Harris from Mesosphere and Joe Stein, a Kafka committer. For an updated version of this article, please see Apache Mesos, Apache Kafka and Kafka Streams for Highly Scalable Microservices.
Apache Kafka and Apache Mesos are very well-known and successful Apache projects. Both have large communities supporting them and companies — Confluent and Mesosphere, respectively — built around them. Recently, the two companies worked together to make Kafka one of the first certified services on Mesosphere’s Datacenter Operating System.
Stream data is everywhere and is on the rise – streams of application metrics for monitoring a company’s IT infrastructure, streams of orders and shipment for retail companies, streams of activity data from devices for the IoT, streams of stock ticker data for finance companies and so on. Increasingly there is a need for infrastructure to harness all this stream data and make it available to various applications in a company’s data center that need to process it and do so in real-time and at scale.
Kafka’s unique ability to move large amounts of data in real-time makes it a perfect fit for managing stream data and many organizations use Kafka to power real-time data monitoring, analysis, security, fraud detection and stream processing. Kafka also plays a key role in integrating various systems in a company’s data center. Apache Mesos abstracts away data center resources to make it easy to deploy and manage distributed applications and systems. This post explains how to run Kafka clusters on Mesos to simplify the task of managing stream data at scale.
Apache Kafka is a distributed, high-throughput, low-latency publish-subscribe messaging system. Since it was open-sourced in 2011, Apache Kafka has been adopted widely across the industry at web companies like LinkedIn, Netflix, and Uber, as well as traditional enterprises like Cisco and Goldman Sachs. At these companies, it forms the backbone for critical data pipelines moving hundreds of billions of messages per day in real-time.
Apache Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications or frameworks. It sits between the application layer and the operating system, making it easy and efficient to deploy and manage applications in large-scale clustered environments.
Here is a quick overview of the Apache Mesos architecture. Mesos is made up of Masters, Slaves and Frameworks.
The Mesos Master is responsible for the communication of resources between Slaves and the Frameworks. There is only one Master running as the leader at any one time. There is typically at least one stand-by process for failover in the case the Master crashes (in proxy mode stand-bys pass the data to the Master). The Mesos Master is responsible for allocating the resources (between schedulers and slaves) for tasks, managing state, high availability, etc.
The Mesos Slave launches local processes on the servers where it’s running. These processes are launched within Linux containers by the Executor, which is the parent container for any process that it might launch, in addition to itself being a process.
Frameworks receive resource offers from the Mesos Master about the Mesos Slaves’ resources (e.g. cpu and ram). Frameworks are made up of two components:
The Scheduler – The Scheduler provides the foundation for managing what the framework’s tasks are doing. The scheduler is responsible for managing the state between slave success and failures, tasks failures, internal application configuration and failures, communication to the outside world and a lot more.
The Executor – The Executor is the running application’s code on the server. Within this container, other processes can be launched as well, depending on how the application is configured to manage itself. Most often, the Executor is just the business logic code running on the server with a thin layer for interacting with the Mesos Master.
You can read more about the Apache Mesos Architecture here.
Marathon is a Mesos framework that makes it easy to launch any long-running application on Mesos without requiring a custom, application-specific framework. It automatically provides a lot of features that many applications need in a clustered environment, such as high-availability, node constraints, application health checks, an API for scriptability and service discovery, and an easy to use web-based user interface. The simplicity of Marathon comes with loss of flexibility and customization though. The application has no say in how the resources should be allocated to respect certain constraints, for example, to preserve processing or data affinity.
At first, we started by running Kafka on Marathon but, in practice, we ran into a number of issues.
First, Marathon is not designed for managing stateful services. In the event of a failure or even a simple service restart, Marathon blindly restarts the service on any other resource that matches the constraints defined by the service. This is not ideal for stateful services since that requires the service to move all its local state to the new server which can turn out to be a very expensive operation. Kafka, like several other storage systems, maintains its data on local disks. Running Kafka on Marathon would mean a simple restart operation of a broker process can move each broker to a different server, making the broker replicate all its data from the rest of the brokers. Since typically Kafka stores large amounts of data, this could mean unnecessarily replicating terabytes of data over the wire. You want brokers to be able to wait if a restart was caused by a broker and if something becomes critically wrong after that still move the broker.
Second, Marathon does not allow you to selectively load balance the application’s state amongst a subset of processes that belong to the application. In Kafka, examples of this operation are cluster expansion where you want to selectively move some partitions from the rest of the cluster to the newly restarted brokers. Currently, the cluster expansion operation has to be performed manually through an admin interface in Kafka. Merely starting new brokers in a cluster does not allocate any data to it and the admin has to selectively move some partitions from the rest of the cluster to the newly started broker. Also, Kafka does not have support for quotas yet, so this operation of moving data to the new brokers has to be done carefully in stages to avoid saturating the network and rest of the replication traffic in a Kafka cluster. Marathon does not have hooks that allow application specific business logic to drive the failure detection and handling of processes started using Marathon.
Due to these drawbacks, we decided to pursue the framework approach for integrating Kafka with Mesos.
Here is how the various components of the Kafka Mesos framework work:
The scheduler provides the operational automation for a Kafka cluster and any version of Kafka can run on Mesos through the scheduler. It is the central point where the decisions for task failures, administration and scaling are all made. The scheduler state is maintained in Zookeeper, while the configuration and other administrative tasks are available through a REST API.
The scheduler should be running on Marathon so that if the scheduler process dies, Marathon can launch that on another Mesos Slave.
The executor interacts with the Kafka broker as an intermediary to the scheduler. The executor looks for the Kafka binary release tgz and runs that. This allows users to not only run different versions, but also allows users to patch Kafka and run simulated tests through configured and automated deployments.
If you want to get your hands dirty, here is a quickstart on the Kafka Mesos framework.
Open up two terminal windows. Check the kafka-mesos.properties after you cd into the directory that you cloned to make sure the scheduler is configured for your cluster.
In the first window.
git clone https://github.com/mesos/kafka mesosKafka
cd mesosKafka
./gradlew jar
./kafka-mesos.sh scheduler
In the second window.
./kafka-mesos.sh add 1000..1002 --cpus 0.01 --heap 128 --mem 256
./kafka-mesos.sh start 1000..1002
./kafka-mesos.sh status
At this point you will have 3 Kafka brokers running. For more CLI commands:
./kafka-mesos.sh help
You can also get help for each command
./kafka-mesos.sh help <cmd>
The Kafka Mesos Scheduler provides another option than just having a CLI. All commands available in the CLI are also available in the REST API.
To discover where the Mesos Kafka Scheduler is running you need to query the Marathon api.
curl -X GET -H "Content-type: application/json" -H "Accept: application/json" http://localhost:8080/v2/tasks
The REST API provides every feature that is available in the CLI. This takes the format of:
/api/brokers/<cli command>/id={broker.id}&<k>=<v>
Adding a broker
“http://localhost:7000/api/brokers/add?id=0&cpus=8&mem=43008"
Starting a broker
“http://localhost:7000/api/brokers/start?id=0"
Status of running brokers
curl "http://localhost:7000/api/brokers/status
Existing Kafka tools, producers and consumers all work with Kafka on Mesos just the same way they do while running Kafka outside Mesos. You can discover other brokers by interacting with either the CLI or the REST APIs.
When running the scheduler on Marathon for higher availability, first look up from the Marathon API the host and port of the scheduler, then call that scheduler to find the brokers. Mesos DNS can also be used so that brokers can have static DNS names assigned to them. Once you have your broker to connect to you are good to go.
The future is bright for Kafka on Mesos and the DCOS. We have a lot of feedback and ideas circulating for what should come next and how it should continue to grow. Here are some of what has been discussed, in no particular order, to help improve this integration, most of which are features we are working to add in Apache Kafka itself:
In the time to come, companies will expect to do even more with their growing data. Gone are the days of single monolithic databases. Now companies are adding new specialized distributed systems to process data — but they will need to minimize the complexity of deploying and managing hardware resources, lest they risk becoming slaves to their infrastructure. Not only will Kafka be central to a company’s data pipeline infrastructure to enable data flow to all these diverse systems, but a cluster manager like Mesos will become increasingly important as big data technologies like Kafka continue to proliferate.
In this post, the second in the Kafka Producer and Consumer Internals Series, we follow our brave hero—a well-formed produce request—which is on its way to be processed by the broker and have its data stored on the cluster.
The beauty of Kafka as a technology is that it can do a lot with little effort on your part. In effect, it’s a black box. But what if you need to see into the black box to debug something? This post shows what the producer does behind the scenes to help prepare your raw event data for the broker.