Confluent OEM Program: Grow faster with enterprise-grade data streaming | Learn More

Introduction to Apache Kafka® for Python Programmers

Written By

In this blog post, we’re going to get back to basics and walk through how to get started using Apache Kafka with your Python applications.

We will assume some basic knowledge of Kafka. If you’re new to the project, the introduction and design sections of the Apache documentation are an excellent place to start. The Confluent blog is also packed with great information; Jay Kreps’s A Practical Guide to Building a Streaming Platform covers many of the core Kafka concepts again, but with a focus on Kafka’s role at a company-wide scale. Also noteworthy are Ben Stopford’s microservices blog posts (The Data Dichotomy, Services on a Backbone of Events) for his unique take on the relationship between applications and data.

Installation

For our examples we’ll use Confluent Platform. This is a source-available, open distribution of Kafka that includes connectors for various data systems, a REST layer for Kafka, and a schema registry. On OS X this is easily installed via the tar archive. Instructions for all platforms are available on the Confluent website.

The Confluent Python client confluent-kafka-python leverages the high performance C client librdkafka (also developed and supported by Confluent). Starting with version 1.0, these are distributed as self-contained binary wheels for OS X and Linux on PyPi. You can install (generally inside a virtual environment) with:

pip install confluent-kafka

Starting Kafka

You can get a single-broker Kafka cluster up and running quickly using default configuration files included with the Confluent Platform.

First, you’ll need to start a ZooKeeper instance, which Kafka utilizes for providing various distributed system related services. Assuming you used the zip or tar archive to install Confluent Platform, you can start ZooKeeper from the installation directory as follows:

./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

Then to start a Kafka broker:

./bin/kafka-server-start ./etc/kafka/server.properties

That’s it! You now have a Kafka broker to play with.

Producing Messages

Here’s a simple program that writes a message with key ‘hello‘ and value ‘world‘ to the Kafka topic mytopic:

After importing the Producer class from the confluent_kafka package, we construct a Producer instance and assign it to the variable p. The constructor takes a single argument: a dictionary of configuration parameters. Because confluent-kafka uses librdkafka for its underlying implementation, it shares the same set of configuration properties.

The only required property is bootstrap.servers which is used to specify the address of one or more brokers in your Kafka cluster. In our case, there is only one, but a real-world Kafka cluster may grow to tens or hundreds of nodes. It doesn’t matter which broker(s) you specify here; this setting simply provides a starting point for the client to query the cluster – any broker can answer metadata requests about the cluster.

In the call to the produce method, both the key and value parameters need to be either a byte-like object (in Python 2.x this includes strings), a Unicode object, or None. In Python 3.x, strings are Unicode and will be converted to a sequence of bytes using the UTF-8 encoding. In Python 2.x, objects of type unicode will be encoded using the default encoding. Often, you will want to serialize objects of a particular type before writing them to Kafka. A common pattern for doing this is to subclass Producer and override the produce method with one that performs the required serialization.

The produce method returns immediately without waiting for confirmation that the message has been successfully produced to Kafka (or otherwise). The flush method blocks until all outstanding produce commands have completed, or the optional timeout (specified as a number of seconds) has been exceeded. You can test to see whether all produce commands have completed by checking the value returned by the flush method: if it is greater than zero, there are still produce commands that have yet to complete. Note that you should typically call flush only at application teardown, not during normal flow of execution, as it will prevent requests from being streamlined in a performant manner.

To be notified when produce commands have completed, you can specify a callback function in the produce call. Here’s an example:

The callback method has two parameters – the first sends information about any error that occured whilst producing the message and the second information about the message produced. Callbacks are executed and sent as a side-effect of calls to the poll or flush methods. Unlike the flush method, the poll method always blocks for the specified timeout period (measured in seconds). An advantage of the poll based callback mechanism is that it allows you to keep everything single threaded and easy to reason about.

Consuming Messages

Data is read from Kafka using consumers that are generally working together as part of a consumer group. Different consumers subscribe to one or more topics and are automatically assigned to a subset of each topic’s partitions. If consumers are added or removed (perhaps due to failure) from the group, the group will automatically rebalance so that one and only one consumer is ever reading from each partition in each topic of the subscription set. For more detailed information on how consumer groups work, Jason Gustafson’s blog post covering the Java consumer is an excellent reference.

Below is a simple example that creates a Kafka consumer that joins consumer group mygroup and reads messages from its assigned partitions until Ctrl-C is pressed:

A number of configuration parameters are worth noting:

  1. bootstrap.servers: As with the producer, bootstrap servers specifies the initial point of contact with the Kafka cluster.
  2. group.id: The name of the consumer group the consumer is part of. If the consumer group does not yet exist when the consumer is constructed (there are no existing consumers that are part of the group), the group id will be created automatically. Similarly, if all consumers in a group leave the group, the group and group id will be automatically destroyed.
  3. client.id: Although optional, each consumer in a group should be assigned a unique id – this allows you to differentiate between clients in Kafka error logs and monitoring aggregates.
  4. default.topic.config: A number of topic related configuration properties are grouped together under this high level property. One commonly used topic-level property is auto.offset.reset which specifies which offset to start reading from if there have been no offsets committed to a topic/partition yet. This defaults to latest, however you will often want this to be smallest so that old messages are not ignored when you first start reading from a topic.
  5. enable.auto.commit: By default, as the consumer reads messages from Kafka, it will periodically commit its current offset (defined as the offset of the next message to be read) for the partitions it is reading from back to Kafka. Often you would like more control over exactly when offsets are committed. In this case you can set enable.auto.commit to False and call the commit method on the consumer. For simplicity, we have left auto offset commit enabled in this example.

After constructing the consumer, the subscribe method is called to inform Kafka that we wish to join the consumer group mygroup (specified in the configuration) and read messages from a single topic mytopic. It’s possible to subscribe to more than one topic by specifying more than one topic name in the list provided to the subscribe method. Note that you can’t do this by calling the subscribe method a second time – this would result in the consumer first unsubscribing from the original subscription set and then subscribing to only the topic(s) in the newly specified one.

Having subscribed to a set of topic groups, we enter the main poll loop. This is wrapped in a try/except block that allows controlled shutdown of the consumer via the close method when the user interrupts program execution. If the close method is omitted, the consumer group would not rebalance immediately – removal of the consumer from the group would occur as per the consumer group failure detection protocol after the session.timeout.ms has elapsed.

On the consumer, the poll method blocks until a Message object is ready for consumption, or until the timeout period (specified in seconds) has elapsed, in which case the return value is None. When a Message object is available, there are essentially three cases to consider, differentiated by the value returned by Message.error():

  1. None: The Message object represents a consumed message. The message key, value and other relevant information can be obtained via the key(), value(), timestamp(), topic(), partition() and offset() methods of the Message object.
  2. KafkaError._PartitionEOF: The Message object does not encapsulate any consumed message – it simply signals that the end of a partition has been reached. You can use the partition() and topic() methods to determine the pertinent partition.
  3. Any other value: An error occurred during consumption. Depending on the result of Message.error(), other Message object methods may return valid values. For most error types, use of topic() and partition() is valid.

Summary

That concludes our introduction on how to integrate Apache Kafka with your Python applications. In order to keep this post to a reasonable length, we’ve omitted some of the more advanced features of kafka python integration provided by the library. For example, you can hook into the partition assignment process that happens after you call subscribe on the consumer but before any messages are read. This allows you to do things like pre-load state associated with the partition assignment for joining with the consumed messages. The client also ships with AvroProducer and AvroConsumer classes that allow you to serialize data in Avro format and manage the evolution of the associated schemas using schema registry. For further information of kafka python integration, refer to the API documentation, the examples in the github repo, or user’s guide on our website.

For expert advice on deploying or operating Kafka, we’ve released a range of training and technical consulting services covering all levels of expertise for you to consume and learn from. For large-scale deployments of Kafka, we offer Confluent Platform, which not only provides a number of powerful features in addition to those under the Confluent Community License but also provides enterprise grade support. Finally, a hosted and fully managed version Apache Kafka is just around the corner with the up-coming Confluent Cloud.

  • As an early employee at Confluent, Matt Howlett has worked on many of Confluent’s community and enterprise products, including Confluent Control Center, Schema Registry, REST Proxy, and client libraries in various languages. Prior to joining Confluent, Matt spent many years developing materials tracking and optimization systems for large mining companies in Australia. His first exposure to distributed systems was in the computer games industry, where he worked on the server of a massively multiplayer online game engine.

Did you like this blog post? Share it now