Streaming applications have gained immense popularity due to their ability to process real-time data efficiently. However, when developing such applications, one crucial aspect that often goes unnoticed is the default partitioning behavior of Java and non-Java producers. This disparity can result in data mismatches and inconsistencies, posing challenges for developers. This blog post delves into the causes of these inconsistencies and explores available options to overcome them, enabling seamless integration between Java and non-Java producers.
In Apache Kafka®’s Java producer library, the default partitioning strategy uses a hashing function called
DefaultPartitioner to determine the topic partition to which a message should be assigned. The
DefaultPartitioner uses a 32-bit hash called
murmur2, the name is derived from its inner loop, which employs two fundamental operations: multiplication and rotation. It computes the partition for a record (with the key defined); for example, let’s say we have a topic with six (6) partitions and we need to determine in what partition a given message needs to be assigned to:
This partitioning strategy is deterministic, it ensures that messages with the same key are consistently assigned to the same partition, preserving the order of related messages within the partition. If the key is
null or not provided, a round-robin mechanism distributes the messages evenly across available partitions.
Kafka provides flexibility for implementing custom partitioners. By extending the
org.apache.kafka.clients.producer.Partitioner interface, developers can define their own partitioning logic, allowing them to use alternative hashing functions or consider other message attributes to determine the target partition.
However, everything would be perfect if there wasn’t an elephant in the room:
That is a very good question, my tusk-tastic friend!
Many non-Java clients rely on a C/C++ library called librdkafka. If you are not familiar with it, the library provides a high-performance, lightweight, and feature-rich implementation of the Apache Kafka wire protocol. It serves as a client library for interacting with Kafka clusters.
Ermmm, so, what is wrong with librdkafka?
That library is widely used in production environments and is utilized by numerous Kafka client implementations in different programming languages such as Python, Go, .NET, C#, etc. However librdkafka, by default, uses a different default partitioner called
CRC32 (Cyclic Redundancy Check 32-bit). Like
murmur2, it also generates a fixed-size 32-bit hash value for a given input data. As one can figure out already, different hashing functions will very likely yield different results.
Because of that, messages with the same key produced by Java and many non-Java clients are expected to be assigned to different partitions. That means, for example, if a Python producer and a source connector (Java based) are producing data to different topics in Kafka, then a merge of these topics using Kafka Streams API or ksqlDB would not work properly as there would be a partition mismatch. If we consider the same table as previously, but this time hashing the keys using
CRC32, we can see the mismatches:
Let’s now run a practical example where data is produced by two different producers, one using the
murmur2 and the other
CRC32 hashing functions:
The first producer writes data to the topic demo_user and uses the
CRC32 hashing function (the names used in this data sample are purely fictional and do not represent any real individuals)
The second producer is a ksqlDB stream where data is written directly to the stream/topic
demo_user_orders using INSERT SQL statements, and as we now know, ksqlDB is part of the Kafka Streams API which is written in Java and by default uses the
murmur2 hashing function
Both topics will have a common key (
The details about this demo can be found on GitHub
As we can see there were mismatches with the partition assignment:
If we then try to merge both topics in ksqlDB by their keys (user_id) we will find several mismatches, but first, let’s see the queries used in this example:
Table DEMO_USER: Directly derived from the topic
Stream DEMO_USER_ORDERS: Fed directly through ksqlDB's REST API
Stream DEMO_USER_ORDERS_MERGED: Merge
demo_user_orders stream (
murmur2) with table
The merged stream looks like this:
Ideally, all producers, regardless of the client language, should have the same hashing function and partitioner. Luckily, for librdkafka-based languages there is a configuration property called
partitioner. To match it with Java’s it should be set to
murmur2_random. For example, when setting the producer the configuration properties should be:
Be sure to double-check whether the client library/version you are using exposes the partitioner parameter back to librdkafka, otherwise, it will assume the default value.
The hashing functions available in librdkafka are listed on GitHub. Below is an extract of it:
Another option is to repartition the topic using ksqlDB. Basically, write a permanent query to read the original table (
DEMO_USER, materialized from the topic
demo_user) partitioned with
CRC32 and dump it to another table/topic. For example:
Since that is a
CREATE TABLE … AS SELECT statement, the messages will be repartitioned using the
murmur2 hashing function as ksqlDB is Java based.
If we now merge the stream
DEMO_USER_ORDERS with the table
DEMO_USER_REPARTITION we will have an exact match on the partition assignment:
One important note: If using the Apache Flink stream processing framework (Java or SQL based), none of that would necessarily be a problem. The reason is that Flink typically repartitions the messages upon ingesting them.
After delving into the causes of inconsistencies between Java and many non-Java producers, we have gained insight into the default partitioning behavior of these applications. It is evident that this disparity can lead to data mismatches and challenges for developers, despite the immense popularity of streaming applications for processing real-time data efficiently. Fortunately, by exploring available options, we can overcome these issues and achieve seamless integration between Java and many non-Java producers, ensuring a more robust and reliable system overall.
Get Apache Kafka and Flink news delivered to your inbox biweekly or read the latest editions on Confluent Developer!
Learn the basics of building Apache Flink applications in Java in our course on Confluent Developer.