[Webinar] Deliver enterprise-grade Apache Kafka® to your customers | Join Now
When Apache Kafka® was originally created, it shipped with a Scala producer and consumer client. Over time we came to realize many of the limitations of these APIs. For example, we had a “high-level” consumer API which supported consumer groups and handled failover, but didn’t support many of the more complex usage scenarios. We also had a “simple” consumer client which provided full control, but required users to manage failover and error handling themselves. So we set about redesigning these clients in order to open up many use cases that were hard or impossible with the old clients and establish a set of APIs we could support over the long haul.
The first phase of this was rewriting the Producer API in 0.8.1. The recent 0.9 release completed the second phase with the introduction of the new Consumer API. Building on top of a new group coordination protocol provided by Kafka itself, the new consumer brings the following advantages:
Although the new consumer uses a redesigned API and a new coordination protocol, the concepts are not fundamentally different, so users familiar with the old consumer shouldn’t have much trouble understanding it. However, there are some subtle details in particular with respect to group management and the threading model which requires some extra care. The purpose of this tutorial is to cover the basic usage of the new consumer and explain all of these details.
One word of caution: at the time of this writing, the new consumer is still considered “beta” in terms of stability. We have fixed several important bugs in the 0.9.0 branch, so if you run into any problems using the 0.9.0.0 release of Kafka, we encourage you to test against that branch. If you still see issues, please report it on the Kafka mailing list or on the Kafka JIRA.
Before getting into the code, we should review some basic concepts. In Kafka, each topic is divided into a set of logs known as partitions. Producers write to the tail of these logs and consumers read the logs at their own pace. Kafka scales topic consumption by distributing partitions among a consumer group, which is a set of consumers sharing a common group identifier. The diagram below shows a single topic with three partitions and a consumer group with two members. Each partition in the topic is assigned to exactly one member in the group.
While the old consumer depended on Zookeeper for group management, the new consumer uses a group coordination protocol built into Kafka itself. For each group, one of the brokers is selected as the group coordinator. The coordinator is responsible for managing the state of the group. Its main job is to mediate partition assignment when new members arrive, old members depart, and when topic metadata changes. The act of reassigning partitions is known as rebalancing the group.
When a group is first initialized, the consumers typically begin reading from either the earliest or latest offset in each partition. The messages in each partition log are then read sequentially. As the consumer makes progress, it commits the offsets of messages it has successfully processed. For example, in the figure below, the consumer’s position is at offset 6 and its last committed offset is at offset 1.
When a partition gets reassigned to another consumer in the group, the initial position is set to the last committed offset. If the consumer in the example above suddenly crashed, then the group member taking over the partition would begin consumption from offset 1. In that case, it would have to reprocess the messages up to the crashed consumer’s position of 6.
The diagram also shows two other significant positions in the log. The log end offset is the offset of the last message written to the log. The high watermark is the offset of the last message that was successfully copied to all of the log’s replicas. From the perspective of the consumer, the main thing to know is that you can only read up to the high watermark. This prevents the consumer from reading unreplicated data which could later be lost.
To get started with the consumer, add the kafka-clients dependency to your project. The maven snippet is provided below:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0-cp1</version>
</dependency>
The consumer is constructed using a Properties file just like the other Kafka clients. In the example below, we provide the minimal configuration needed to use consumer groups.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Just as in the old consumer and the producer, we need to configure an initial list of brokers for the consumer to be able discover the rest of the cluster. This doesn’t need to be all the servers in the cluster—the client will determine the full set of alive brokers from the brokers in this list. We have assumed here that the broker is running on localhost. The consumer also needs to be told how to deserialize message keys and values. Finally, to join a consumer group, we need to configure the group id. As we proceed through this tutorial, we’ll introduce more of the configuration.
To begin consumption, you must first subscribe to the topics your application needs to read from. In the example below, we subscribe to the topics “foo” and “bar.”
consumer.subscribe(Arrays.asList("foo", "bar"));
After you have subscribed, the consumer can coordinate with the rest of the group to get its partition assignment. This is all handled automatically when you begin consuming data. Later we will show how you can assign partitions manually using the assign API, but keep in mind that it is not possible to mix automatic and manual assignment.
The subscribe method is not incremental: you must include the full list of topics that you want to consume from. You can change the set of topics you’re subscribed to at any time–any topics previously subscribed to will be replaced by the new list when you call subscribe.
The consumer needs to be able to fetch data in parallel, potentially from many partitions for many topics likely spread across many brokers. To do this it uses an API style similar to the poll or select call in unix: once topics are registered, all future coordination, rebalancing, and data fetching is driven through a single poll call meant to be invoked in an event loop. This allows for a simple and efficient implementation which can handle all IO from a single thread.
After subscribing to a topic, you need to start the event loop to get a partition assignment and begin fetching data. It sounds complex, but all you need to do is call poll in a loop and the consumer handles the rest. Each call to poll returns a (possibly empty) set of messages from the partitions that were assigned. The example below shows a basic poll loop which prints the offset and value of fetched records as they arrive:
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
}
} finally {
consumer.close();
}
The poll API returns fetched records based on the current position. When the group is first created, the position will be set according to the reset policy (which is typically either set to the earliest or latest offset for each partition). Once the consumer begins committing offsets, then each later rebalance will reset the position to the last committed offset. The parameter passed to poll controls the maximum amount of time that the consumer will block while it awaits records at the current position. The consumer returns immediately as soon as any records are available, but it will wait for the full timeout specified before returning if nothing is available.
The consumer is designed to be run in its own thread. It is not safe for multithreaded use without external synchronization and it is probably not a good idea to try. In this example, we’ve used a flag which can be used to break from the poll loop when the application is shutdown. When this flag is set to false from another thread (e.g. to shutdown the process), the loop will break as soon as poll returns and the application finishes processing whatever records were returned.
You should always close the consumer when you are finished with it. Not only does this clean up any sockets in use, it ensures that the consumer can alert the coordinator about its departure from the group.
This example uses a relatively small timeout to ensure that there is not too much delay when shutting down the consumer. Alternatively, you can use a long timeout and break from the loop using the wakeup API.
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + “: ” + record.value());
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
We’ve changed the timeout to use Long.MAX_VALUE, which basically means the consumer will block indefinitely until the next records can be returned. Instead of setting the flag in the previous example, the thread triggering the shutdown can then call consumer.wakeup() to interrupt an active poll, causing it to throw a WakeupException. This API is safe to use from another thread. Note that if there is no active poll in progress, the exception will be raised from the next call. In this example, we catch the exception to prevent it from being propagated.
In the next example, we’ll put all of this together to build a simple Runnable task which initializes the consumer, subscribes to a list of topics, and executes the poll loop indefinitely until shutdown externally.
public class ConsumerLoop implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public ConsumerLoop(int id,
String groupId,
List<String> topics) {
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put(“group.id”, groupId);
props.put(“key.deserializer”, StringDeserializer.class.getName());
props.put(“value.deserializer”, StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(this.id + ": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}
To test this example, you will need a Kafka broker running release 0.9.0.0 and a topic with some string data to consume. The easiest way to write a bunch of string data to a topic is to using the kafka-verifiable-producer.sh script. To make it interesting, we should also make sure the topic has more than one partition so that one member isn’t left doing all the work. For example, with a single Kafka broker and Zookeeper both running on localhost, you might do the following from the root of the Kafka distribution:
# bin/kafka-topics.sh --create --topic consumer-tutorial --replication-factor 1 --partitions 3 --zookeeper localhost:2181
# bin/kafka-verifiable-producer.sh --topic consumer-tutorial --max-messages 200000 --broker-list localhost:9092
Then we can create a small driver to setup a consumer group with three members, all subscribed to the same topic we have just created.
public static void main(String[] args) {
int numConsumers = 3;
String groupId = "consumer-tutorial-group"
List<String> topics = Arrays.asList("consumer-tutorial");
ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
final List<ConsumerLoop> consumers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
ConsumerLoop consumer = new ConsumerLoop(i, topics);
consumers.add(consumer);
executor.submit(consumer);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (ConsumerLoop consumer : consumers) {
consumer.shutdown();
}
}
});
}
This example submits the three runnable consumers to an executor. Each thread is given a separate id so that you can see which thread is receiving data. The shutdown hook will be invoked when you stop the process, which will halt the three threads using wakeup and wait for them to shutdown. If you run this, you should see lots of data from all of the threads. Here is a sample from one run:
2: {partition=0, offset=928, value=2786}
2: {partition=0, offset=929, value=2789}
1: {partition=2, offset=297, value=891}
2: {partition=0, offset=930, value=2792}
1: {partition=2, offset=298, value=894}
2: {partition=0, offset=931, value=2795}
0: {partition=1, offset=278, value=835}
2: {partition=0, offset=932, value=2798}
0: {partition=1, offset=279, value=838}
1: {partition=2, offset=299, value=897}
1: {partition=2, offset=300, value=900}
1: {partition=2, offset=301, value=903}
1: {partition=2, offset=302, value=906}
1: {partition=2, offset=303, value=909}
1: {partition=2, offset=304, value=912}
0: {partition=1, offset=280, value=841}
2: {partition=0, offset=933, value=2801}
The output shows consumption across all three partitions. Each partition has been assigned to one of the threads. Within each partition, you can see the offsets increasing as expected. You can shutdown the process using Ctrl-C from the command line or through your IDE.
When part of a consumer group, each consumer is assigned a subset of the partitions from topics it has subscribed to. This is basically a group lock on those partitions. As long as the lock is held, no other members in the group will be able to read from them. When your consumer is healthy, this is exactly what you want. It’s the only way that you can avoid duplicate consumption. But if the consumer dies due to a machine or application failure, you need that lock to be released so that the partitions can be assigned to a healthy member.
Kafka’s group coordination protocol addresses this problem using a heartbeat mechanism. After every rebalance, all members of the current generation begin sending periodic heartbeats to the group coordinator. As long as the coordinator continues receiving heartbeats, it assumes that members are healthy. On every received heartbeat, the coordinator starts (or resets) a timer. If no heartbeat is received when the timer expires, the coordinator marks the member dead and signals the rest of the group that they should rejoin so that partitions can be reassigned. The duration of the timer is known as the session timeout and is configured on the client with the setting session.timeout.ms.
props.put("session.timeout.ms", "60000");
The session timeout ensures that the lock will be released if the machine or application crashes or if a network partition isolates the consumer from the coordinator. However, application failures are a little trickier to handle generally. Just because the consumer is still sending heartbeats to the coordinator does not necessarily mean that the application is healthy.
The consumer’s poll loop is designed to handle this problem. All network IO is done in the foreground when you call poll or one of the other blocking APIs. The consumer does not use any background threads. This means that heartbeats are only sent to the coordinator when you call poll. If your application stops polling (whether because the processing code has thrown an exception or a downstream system has crashed), then no heartbeats will be sent, the session timeout will expire, and the group will be rebalanced. The only problem with this is that a spurious rebalance might be triggered if the consumer takes longer than the session timeout to process messages. You should therefore set the session timeout large enough to make this unlikely. The default is 30 seconds, but it’s not unreasonable to set it as high as several minutes. The only downside of a larger session timeout is that it will take longer for the coordinator to detect genuine consumer crashes.
When a consumer group is first created, the initial offset is set according to the policy defined by the auto.offset.reset configuration setting. Once the consumer begins processing, it commits offsets regularly according to the needs of the application. After every subsequent rebalance, the position will be set to the last committed offset for that partition in the group. If the consumer crashes before committing offsets for messages that have been successfully processed, then another consumer will end up repeating the work. The more frequently you commit offsets, the less duplicates you will see in a crash. In the examples thus far, we have assumed that the automatic commit policy is enabled. When the setting enable.auto.commit is set to true (which is the default), the consumer automatically triggers offset commits periodically according to the interval configured with “auto.commit.interval.ms.” By reducing the commit interval, you can limit the amount of re-processing the consumer must do in the event of a crash. To use the consumer’s commit API, you should first disable automatic commit by setting enable.auto.commit to false in the consumer’s configuration.
props.put("enable.auto.commit", "false");
The commit API itself is trivial to use, but the most important point is how it is integrated into the poll loop. The following examples therefore include the full poll loop with the commit details in bold. The easiest way to handle commits manually is with the synchronous commit API:
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record