Confluent
Introduction to Apache Kafka® for Python Programmers
Apache Kafka

Introduction to Apache Kafka® for Python Programmers

Matt Howlett

In this blog post, we’re going to get back to basics and walk through how to get started using Apache Kafka with your Python applications.

We will assume some basic knowledge of Kafka. If you’re new to the project, the introduction and design sections of the Apache documentation are an excellent place to start. The Confluent blog is also packed with great information; Jay Kreps’s A Practical Guide to Building a Streaming Platform covers many of the core concepts again, but with a focus on Kafka’s role at a company-wide scale. Also noteworthy are Ben Stopford’s microservices blog posts (The Data Dichotomy, Services on a Backbone of Events) for his unique take on the relationship between applications and data.

Installation

For our examples we’ll use Confluent Open Source. This is an Apache 2.0 licensed open source distribution of Kafka that includes connectors for various data systems, a REST layer for Kafka, and a schema registry. On OS X this is easily installed via the tar archive. Instructions for all platforms are available on the Confluent website.

The Confluent Python client confluent-kafka-python leverages the high performance C client librdkafka (also developed and supported by Confluent). Before installing the Python client you’ll need to install the librdkafka shared library and corresponding C header files. If you’re on OS X, the easiest way to achieve this is to install the homebrew librdkafka package:

brew install librdkafka

librdkafka is also available in package form for other platforms, or it’s also very easy to install from source. For more information refer to the installation instructions.

You’ll also need to set up your environment to be able to build Python C extensions, if it’s not already. On OS X, that means installing Xcode via the app store, then installing the command line developer tools as follows:

xcode-select --install

Now you’re ready to install the Python client. This is most conveniently done from PyPI using pip (generally inside a virtual environment):

pip install confluent-kafka

Starting Kafka

You can get a single-broker Kafka cluster up and running quickly using default configuration files included with Confluent Open Source.

First, you’ll need to start a Zookeeper instance, which Kafka utilizes for providing various distributed system related services. Assuming you used the zip or tar archive to install Confluent Open Source, you can start Zookeeper from the installation directory as follows:

./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

Then to start a Kafka broker:

./bin/kafka-server-start ./etc/kafka/server.properties

That’s it! You now have a Kafka broker to play with.

Producing Messages

Here’s a simple program that writes a message with key ‘hello‘ and value ‘world‘ to the Kafka topic mytopic:

After importing the Producer class from the confluent_kafka package, we construct a Producer instance and assign it to the variable p. The constructor takes a single argument: a dictionary of configuration parameters. Because confluent-kafka uses librdkafka for its underlying implementation, it shares the same set of configuration properties.

The only required property is bootstrap.servers which is used to specify the address of one or more brokers in your Kafka cluster. In our case, there is only one, but a real-world Kafka cluster may grow to tens or hundreds of nodes. It doesn’t matter which broker(s) you specify here; this setting simply provides a starting point for the client to query the cluster – any broker can answer metadata requests about the cluster.

In the call to the produce method, both the key and value parameters need to be either a byte-like object (in Python 2.x this includes strings), a Unicode object, or None. In Python 3.x, strings are Unicode and will be converted to a sequence of bytes using the UTF-8 encoding. In Python 2.x, objects of type unicode will be encoded using the default encoding. Often, you will want to serialize objects of a particular type before writing them to Kafka. A common pattern for doing this is to subclass Producer and override the produce method with one that performs the required serialization.

The produce method returns immediately without waiting for confirmation that the message has been successfully produced to Kafka (or otherwise). The flush method blocks until all outstanding produce commands have completed, or the optional timeout (specified as a number of seconds) has been exceeded. You can test to see whether all produce commands have completed by checking the value returned by the flush method: if it is greater than zero, there are still produce commands that have yet to complete. Note that you should typically call flush only at application teardown, not during normal flow of execution, as it will prevent requests from being streamlined in a performant manner.

To be notified when produce commands have completed, you can specify a callback function in the produce call. Here’s an example:

The callback method has two parameters – the first provides information about any error that occured whilst producing the message and the second information about the message produced. Callbacks are executed as a side-effect of calls to the poll or flush methods. Unlike the flush method, the poll method always blocks for the specified timeout period (measured in seconds). An advantage of the poll based callback mechanism is that it allows you to keep everything single threaded and easy to reason about.

Consuming Messages

Data is read from Kafka using consumers that are generally working together as part of a consumer group. Consumers subscribe to one or more topics and are automatically assigned to a subset of each topic’s partitions. If consumers are added or removed (perhaps due to failure) from the group, the group will automatically rebalance so that one and only one consumer is ever reading from each partition in each topic of the subscription set. For more detailed information on how consumer groups work, Jason Gustafson’s blog post covering the Java consumer is an excellent reference.

Below is a simple example that creates a Kafka consumer that joins consumer group mygroup and reads messages from its assigned partitions until Ctrl-C is pressed:

A number of configuration parameters are worth noting:

  1. bootstrap.servers: As with the producer, this specifies the initial point of contact with the Kafka cluster.
  2. group.id: The name of the consumer group the consumer is part of. If the consumer group does not yet exist when the consumer is constructed (there are no existing consumers that are part of the group), it will be created automatically. Similarly, if all consumers in a group leave the group, the group will be automatically destroyed.
  3. client.id: Although optional, each consumer in a group should be assigned a unique id – this allows you to differentiate between clients in Kafka error logs and monitoring aggregates.
  4. default.topic.config: A number of topic related configuration properties are grouped together under this top level property. One commonly used topic-level property is auto.offset.reset which specifies which offset to start reading from if there have been no offsets committed to a topic/partition yet. This defaults to latest, however you will often want this to be smallest so that old messages are not ignored when you first start reading from a topic.
  5. enable.auto.commit: By default, as the consumer reads messages from Kafka, it will periodically commit its current offset (defined as the offset of the next message to be read) for the partitions it is reading from back to Kafka. Often you would like more control over exactly when offsets are committed. In this case you can set enable.auto.commit to False and call the commit method on the consumer. For simplicity, we have left auto offset commit enabled in this example.

After constructing the consumer, the subscribe method is called to inform Kafka that we wish to join the consumer group mygroup (specified in the configuration) and read messages from a single topic mytopic. It’s possible to subscribe to more than one topic by specifying more than one topic name in the list provided to the subscribe method. Note that you can’t do this by calling the subscribe method a second time – this would result in the consumer first unsubscribing from the original subscription set and then subscribing to only the topic(s) in the newly specified one.

Having subscribed to a set of topics, we enter the main poll loop. This is wrapped in a try/except block that allows controlled shutdown of the consumer via the close method when the user interrupts program execution. If the close method is omitted, the consumer group would not rebalance immediately – removal of the consumer from the group would occur as per the consumer group failure detection protocol after the session.timeout.ms has elapsed.

On the consumer, the poll method blocks until a Message object is ready for consumption, or until the timeout period (specified in seconds) has elapsed, in which case the return value is None. When a Message object is available, there are essentially three cases to consider, differentiated by the value returned by Message.error():

  1. None: The Message object represents a consumed message. The message key, value and other relevant information can be obtained via the key(), value(), timestamp(), topic(), partition() and offset() methods of the Message object.
  2. KafkaError._PartitionEOF: The Message object does not encapsulate any consumed message – it simply signals that the end of a partition has been reached. You can use the partition() and topic() methods to determine the pertinent partition.
  3. Any other value: An error occurred during consumption. Depending on the result of Message.error(), other Message object methods may return valid values. For most error types, use of topic() and partition() is valid.

Summary

That concludes our introduction on how to integrate Apache Kafka with your Python applications. In order to keep this post to a reasonable length, we’ve omitted some of the more advanced features provided by the library. For example, you can hook into the partition assignment process that happens after you call subscribe on the consumer but before any messages are read. This allows you to do things like pre-load state associated with the partition assignment for joining with the consumed messages. The client also ships with AvroProducer and AvroConsumer classes that allow you to serialize data in Avro format and manage the evolution of the associated schemas using schema registry. For further information, refer to the API documentation, the examples in the github repo, or user’s guide on our website.

For expert advice on deploying or operating Kafka, we offer a range of training and technical consulting services covering all levels of expertise. For large-scale deployments of Kafka we offer Confluent Enterprise which provides a number of powerful features in addition to those provided by Confluent Open Source as well as enterprise grade support. Finally, a hosted and fully managed version Apache Kafka is just around the corner with the up-coming Confluent Cloud.

Subscribe to the Confluent Blog

Subscribe
Email *

More Articles Like This

Gwen Shapira

The First Annual State of Apache Kafka® Client Use Survey

Gwen Shapira . .

At the end of 2016 we conducted a survey of the Apache Kafka® community regarding their use of Kafka clients (the producers and consumers used with Kafka) and their priorities ...

Apache Kafka logo
Jay Kreps

Confluent Contributions to the Apache Kafka™ Client Ecosystem

Jay Kreps . .

If you are using Apache Kafka from a language other than Java one of the first questions you probably have is something like, “Why are there two (or five!) clients ...

Leave a Reply

Your email address will not be published. Required fields are marked *

Comments

    1. Hi Mayank – the Kafka Streams API is only available in Java, though we have started to think about what it might look like in other languages.

  1. Will the Kafka Streams API be available in Python at some point? All of our Kafka producer/consumer code is in Python, and having the Streams API available in Java only has been a barrier.

Try Confluent Platform

Download Now