Live Demo: Build Scalable Event-Driven Microservices with Confluent | Register Now

The Importance of Standardized Hashing Across Producers

Written By

Streaming applications have gained immense popularity due to their ability to process real-time data efficiently. However, when developing such applications, one crucial aspect that often goes unnoticed is the default partitioning behavior of Java and non-Java producers. This disparity can result in data mismatches and inconsistencies, posing challenges for developers. This blog post delves into the causes of these inconsistencies and explores available options to overcome them, enabling seamless integration between Java and non-Java producers.

In Apache Kafka®’s Java producer library, the default partitioning strategy uses a hashing function called DefaultPartitioner to determine the topic partition to which a message should be assigned. The DefaultPartitioner uses a 32-bit hash called murmur2, the name is derived from its inner loop, which employs two fundamental operations: multiplication and rotation. It computes the partition for a record (with the key defined); for example, let’s say we have a topic with six (6) partitions and we need to determine in what partition a given message needs to be assigned to:

This partitioning strategy is deterministic, it ensures that messages with the same key are consistently assigned to the same partition, preserving the order of related messages within the partition. If the key is null or not provided, a round-robin mechanism distributes the messages evenly across available partitions.

Kafka provides flexibility for implementing custom partitioners. By extending the org.apache.kafka.clients.producer.Partitioner interface, developers can define their own partitioning logic, allowing them to use alternative hashing functions or consider other message attributes to determine the target partition.

However, everything would be perfect if there wasn’t an elephant in the room:

That is a very good question, my tusk-tastic friend!

Many non-Java clients rely on a C/C++ library called librdkafka. If you are not familiar with it, the library provides a high-performance, lightweight, and feature-rich implementation of the Apache Kafka wire protocol. It serves as a client library for interacting with Kafka clusters.

Ermmm, so, what is wrong with librdkafka?

Absolutely nothing!

That library is widely used in production environments and is utilized by numerous Kafka client implementations in different programming languages such as Python, Go, .NET, C#, etc. However librdkafka, by default, uses a different default partitioner called CRC32 (Cyclic Redundancy Check 32-bit). Like murmur2, it also generates a fixed-size 32-bit hash value for a given input data. As one can figure out already, different hashing functions will very likely yield different results.

Because of that, messages with the same key produced by Java and many non-Java clients are expected to be assigned to different partitions. That means, for example, if a Python producer and a source connector (Java based) are producing data to different topics in Kafka, then a merge of these topics using Kafka Streams API or ksqlDB would not work properly as there would be a partition mismatch. If we consider the same table as previously, but this time hashing the keys using CRC32, we can see the mismatches:

Let’s now run a practical example where data is produced by two different producers, one using the murmur2 and the other CRC32 hashing functions:

  • The first producer writes data to the topic demo_user and uses the CRC32 hashing function (the names used in this data sample are purely fictional and do not represent any real individuals)

  • The second producer is a ksqlDB stream where data is written directly to the stream/topic demo_user_orders using INSERT SQL statements, and as we now know, ksqlDB is part of the Kafka Streams API which is written in Java and by default uses the murmur2 hashing function

  • Both topics will have a common key (user_id)

  • The details about this demo can be found on GitHub

As we can see there were mismatches with the partition assignment:

If we then try to merge both topics in ksqlDB by their keys (user_id) we will find several mismatches, but first, let’s see the queries used in this example:

Table DEMO_USER: Directly derived from the topic demo_user

CREATE TABLE IF NOT EXISTS DEMO_USER (
    user_id VARCHAR PRIMARY KEY,
    name VARCHAR,
    age BIGINT
) WITH (
    KAFKA_TOPIC = 'demo_user',
    VALUE_FORMAT = 'JSON'
);

Stream DEMO_USER_ORDERS: Fed directly through ksqlDB's REST API

CREATE STREAM IF NOT EXISTS DEMO_USER_ORDERS (
    user_id VARCHAR KEY,
    ts BIGINT,
    product_id BIGINT,
    qty BIGINT,
    unit_price DOUBLE,
    channel VARCHAR
) WITH (
    KAFKA_TOPIC = 'demo_user_orders',
    VALUE_FORMAT = 'JSON',
    TIMESTAMP = 'ts'
);

Stream DEMO_USER_ORDERS_MERGED: Merge demo_user_orders stream (murmur2) with table demo_user (CRC32)

CREATE STREAM IF NOT EXISTS DEMO_USER_ORDERS_MERGED AS
    SELECT
        DEMO_USER_ORDERS.user_id AS user_id,
        DEMO_USER.name,
        DEMO_USER.age,
        product_id,
        qty,
        unit_price,
        channel,
        ts
    FROM DEMO_USER_ORDERS
    LEFT JOIN DEMO_USER ON DEMO_USER_ORDERS.user_id = DEMO_USER.user_id
EMIT CHANGES;

The merged stream looks like this:

How to address the partition assignment mismatches

Ideally, all producers, regardless of the client language, should have the same hashing function and partitioner. Luckily, for librdkafka-based languages there is a configuration property called partitioner. To match it with Java’s it should be set to murmur2_random. For example, when setting the producer the configuration properties should be:

{
	"bootstrap.servers": "...",
	"partitioner": "murmur2_random",
	...
}

Be sure to double-check whether the client library/version you are using exposes the partitioner parameter back to librdkafka, otherwise, it will assume the default value.

The hashing functions available in librdkafka are listed on GitHub. Below is an extract of it:

Another option is to repartition the topic using ksqlDB. Basically, write a permanent query to read the original table (DEMO_USER, materialized from the topic demo_user) partitioned with CRC32 and dump it to another table/topic. For example:

CREATE TABLE IF NOT EXISTS DEMO_USER_REPARTITION
WITH (
    KAFKA_TOPIC = 'demo_user_repartition',
    VALUE_FORMAT = 'JSON'
)
AS
SELECT
    user_id, name, age
FROM DEMO_USER
EMIT CHANGES;

Since that is a CREATE TABLE … AS SELECT statement, the messages will be repartitioned using the murmur2 hashing function as ksqlDB is Java based.

If we now merge the stream DEMO_USER_ORDERS with the table DEMO_USER_REPARTITION we will have an exact match on the partition assignment:

CREATE STREAM IF NOT EXISTS DEMO_USER_ORDERS_MERGED_REPARTITION AS
    SELECT
        DEMO_USER_ORDERS.user_id AS user_id,
        DEMO_USER_REPARTITION.name,
        DEMO_USER_REPARTITION.age,
        product_id,
        qty,
        unit_price,
        channel,
        ts
    FROM DEMO_USER_ORDERS
    LEFT JOIN DEMO_USER_REPARTITION ON DEMO_USER_ORDERS.user_id = DEMO_USER_REPARTITION.user_id
EMIT CHANGES;

One important note: If using the Apache Flink stream processing framework (Java or SQL based), none of that would necessarily be a problem. The reason is that Flink typically repartitions the messages upon ingesting them.

Conclusion

After delving into the causes of inconsistencies between Java and many non-Java producers, we have gained insight into the default partitioning behavior of these applications. It is evident that this disparity can lead to data mismatches and challenges for developers, despite the immense popularity of streaming applications for processing real-time data efficiently. Fortunately, by exploring available options, we can overcome these issues and achieve seamless integration between Java and many non-Java producers, ensuring a more robust and reliable system overall.

  • Italo Nesi is a Sr. Solutions Engineer at Confluent, bringing a wealth of over 30 years of experience in various roles such as software engineer, solutions engineer/architect, pre-sales engineer, full stack developer, IoT developer/architect, and a passionate home automation hobbyist. He possesses a strong penchant for building innovative solutions rather than starting from scratch, leveraging existing tools and technologies to deliver efficient and effective results for the core business. His expertise lies in combining his technical prowess with a practical approach, ensuring optimal outcomes while avoiding unnecessary reinvention of the wheel. He holds a bachelor’s degree in electronics engineering from the Federal University of Rio Grande do Norte/Brazil, an MBA from the Federal University of Rio de Janeiro/Brazil (COPPEAD), and an executive master’s degree in International Supply Chain Management from Université Catholique de Louvain/Belgium.

Did you like this blog post? Share it now