Streaming applications have gained immense popularity due to their ability to process real-time data efficiently. However, when developing such applications, one crucial aspect that often goes unnoticed is the default partitioning behavior of Java and non-Java producers. This disparity can result in data mismatches and inconsistencies, posing challenges for developers. This blog post delves into the causes of these inconsistencies and explores available options to overcome them, enabling seamless integration between Java and non-Java producers.
In Apache Kafka®’s Java producer library, the default partitioning strategy uses a hashing function called DefaultPartitioner
to determine the topic partition to which a message should be assigned. The DefaultPartitioner
uses a 32-bit hash called murmur2
, the name is derived from its inner loop, which employs two fundamental operations: multiplication and rotation. It computes the partition for a record (with the key defined); for example, let’s say we have a topic with six (6) partitions and we need to determine in what partition a given message needs to be assigned to:
This partitioning strategy is deterministic, it ensures that messages with the same key are consistently assigned to the same partition, preserving the order of related messages within the partition. If the key is null
or not provided, a round-robin mechanism distributes the messages evenly across available partitions.
Kafka provides flexibility for implementing custom partitioners. By extending the org.apache.kafka.clients.producer.Partitioner
interface, developers can define their own partitioning logic, allowing them to use alternative hashing functions or consider other message attributes to determine the target partition.
However, everything would be perfect if there wasn’t an elephant in the room:
That is a very good question, my tusk-tastic friend!
Many non-Java clients rely on a C/C++ library called librdkafka. If you are not familiar with it, the library provides a high-performance, lightweight, and feature-rich implementation of the Apache Kafka wire protocol. It serves as a client library for interacting with Kafka clusters.
Ermmm, so, what is wrong with librdkafka?
Absolutely nothing!
That library is widely used in production environments and is utilized by numerous Kafka client implementations in different programming languages such as Python, Go, .NET, C#, etc. However librdkafka, by default, uses a different default partitioner called CRC32
(Cyclic Redundancy Check 32-bit). Like murmur2
, it also generates a fixed-size 32-bit hash value for a given input data. As one can figure out already, different hashing functions will very likely yield different results.
Because of that, messages with the same key produced by Java and many non-Java clients are expected to be assigned to different partitions. That means, for example, if a Python producer and a source connector (Java based) are producing data to different topics in Kafka, then a merge of these topics using Kafka Streams API or ksqlDB would not work properly as there would be a partition mismatch. If we consider the same table as previously, but this time hashing the keys using CRC32
, we can see the mismatches:
Let’s now run a practical example where data is produced by two different producers, one using the murmur2
and the other CRC32
hashing functions:
The first producer writes data to the topic demo_user and uses the CRC32
hashing function (the names used in this data sample are purely fictional and do not represent any real individuals)
The second producer is a ksqlDB stream where data is written directly to the stream/topic demo_user_orders
using INSERT SQL statements, and as we now know, ksqlDB is part of the Kafka Streams API which is written in Java and by default uses the murmur2
hashing function
Both topics will have a common key (user_id
)
The details about this demo can be found on GitHub
As we can see there were mismatches with the partition assignment: