Introducing Connector Private Networking: Join The Upcoming Webinar!

The Importance of Standardized Hashing Across Producers

The Confluent Developer Newsletter

Get Apache Kafka and Flink news delivered to your inbox biweekly or read the latest editions on Confluent Developer!

Course: Building Apache Flink Applications in Java

Learn the basics of building Apache Flink applications in Java in our course on Confluent Developer.

Written By

Streaming applications have gained immense popularity due to their ability to process real-time data efficiently. However, when developing such applications, one crucial aspect that often goes unnoticed is the default partitioning behavior of Java and non-Java producers. This disparity can result in data mismatches and inconsistencies, posing challenges for developers. This blog post delves into the causes of these inconsistencies and explores available options to overcome them, enabling seamless integration between Java and non-Java producers.

In Apache Kafka®’s Java producer library, the default partitioning strategy uses a hashing function called DefaultPartitioner to determine the topic partition to which a message should be assigned. The DefaultPartitioner uses a 32-bit hash called murmur2, the name is derived from its inner loop, which employs two fundamental operations: multiplication and rotation. It computes the partition for a record (with the key defined); for example, let’s say we have a topic with six (6) partitions and we need to determine in what partition a given message needs to be assigned to:

 

Message Key

Murmur2 Hash

Partition Assigned

(Hash MOD% #Partitions)

my_key_1

1322862588

0

my_key_2

201614428

4

my_key_3

273244397

5

my_key_4

1336656080

2

This partitioning strategy is deterministic, it ensures that messages with the same key are consistently assigned to the same partition, preserving the order of related messages within the partition. If the key is null or not provided, a round-robin mechanism distributes the messages evenly across available partitions.

Kafka provides flexibility for implementing custom partitioners. By extending the org.apache.kafka.clients.producer.Partitioner interface, developers can define their own partitioning logic, allowing them to use alternative hashing functions or consider other message attributes to determine the target partition.

However, everything would be perfect if there wasn’t an elephant in the room:

That is a very good question, my tusk-tastic friend!

Many non-Java clients rely on a C/C++ library called librdkafka. If you are not familiar with it, the library provides a high-performance, lightweight, and feature-rich implementation of the Apache Kafka wire protocol. It serves as a client library for interacting with Kafka clusters.

Ermmm, so, what is wrong with librdkafka?

Absolutely nothing!

That library is widely used in production environments and is utilized by numerous Kafka client implementations in different programming languages such as Python, Go, .NET, C#, etc. However librdkafka, by default, uses a different default partitioner called CRC32 (Cyclic Redundancy Check 32-bit). Like murmur2, it also generates a fixed-size 32-bit hash value for a given input data. As one can figure out already, different hashing functions will very likely yield different results.

Because of that, messages with the same key produced by Java and many non-Java clients are expected to be assigned to different partitions. That means, for example, if a Python producer and a source connector (Java based) are producing data to different topics in Kafka, then a merge of these topics using Kafka Streams API or ksqlDB would not work properly as there would be a partition mismatch. If we consider the same table as previously, but this time hashing the keys using CRC32, we can see the mismatches:

 

Message Key

Murmur2 Hash

Partition Assigned

CRC32

Hash

Partition Assigned

my_key_1

1322862588

0

2133006026

2

my_key_2

201614428

4

3861530480

2

my_key_3

273244397

5

2435676134

2

my_key_4

1336656080

2

256500293

5

Let’s now run a practical example where data is produced by two different producers, one using the murmur2 and the other CRC32 hashing functions:

  • The first producer writes data to the topic demo_user and uses the CRC32 hashing function (the names used in this data sample are purely fictional and do not represent any real individuals)

  • The second producer is a ksqlDB stream where data is written directly to the stream/topic demo_user_orders using INSERT SQL statements, and as we now know, ksqlDB is part of the Kafka Streams API which is written in Java and by default uses the murmur2 hashing function

  • Both topics will have a common key (user_id)

  • The details about this demo can be found on GitHub

Python Producer (CRC32)

Key (user_id)

Value

Partition

4a313eba

{"name": "Destiney Bernal", "age": 24}

2

4e99c1dd

{"name": "Maximo Manning", "age": 53}

2

8862a117

{"name": "Jonathon Montoya", "age": 61}

0

1f827762

{"name": "Leo Robertson", "age": 56}

0

24085bdc

{"name": "Sonny Ewing", "age": 48}

2

ksqlDB Producer (murmur2)

Key (user_id)

Value

Partition

4a313eba

{"ts": 1677844113565, "product_id": 1249, "qty": 8, "unit_price": 0.13, "channel": "partner"}

0

4e99c1dd

{"ts": 1677844113850, "product_id": 1997, "qty": 5, "unit_price": 78.85, "channel": "partner"}

4

8862a117

{"ts": 1677844114148, "product_id": 1584, "qty": 1, "unit_price": 9.08, "channel": "store"}

3

1f827762

{"ts": 1677844114448, "product_id": 1497, "qty": 4, "unit_price": 93.78, "channel": "partner"}

0

24085bdc

{"ts": 1677844114745, "product_id": 1585, "qty": 9, "unit_price": 89.31, "channel": "store"}

1

As we can see there were mismatches with the partition assignment:

Key (user_id)

CRC32

murmur2

4a313eba

2

0

4e99c1dd

2

4

8862a117

0

3

1f827762

0

0

24085bdc

2

1

If we then try to merge both topics in ksqlDB by their keys (user_id) we will find several mismatches, but first, let’s see the queries used in this example:

Table DEMO_USER: Directly derived from the topic demo_user

CREATE TABLE IF NOT EXISTS DEMO_USER (
    user_id VARCHAR PRIMARY KEY,
    name VARCHAR,
    age BIGINT
) WITH (
    KAFKA_TOPIC = 'demo_user',
    VALUE_FORMAT = 'JSON'
);

Stream DEMO_USER_ORDERS: Fed directly through ksqlDB's REST API

CREATE STREAM IF NOT EXISTS DEMO_USER_ORDERS (
    user_id VARCHAR KEY,
    ts BIGINT,
    product_id BIGINT,
    qty BIGINT,
    unit_price DOUBLE,
    channel VARCHAR
) WITH (
    KAFKA_TOPIC = 'demo_user_orders',
    VALUE_FORMAT = 'JSON',
    TIMESTAMP = 'ts'
);

Stream DEMO_USER_ORDERS_MERGED: Merge demo_user_orders stream (murmur2) with table demo_user (CRC32)

CREATE STREAM IF NOT EXISTS DEMO_USER_ORDERS_MERGED AS
    SELECT
        DEMO_USER_ORDERS.user_id AS user_id,
        DEMO_USER.name,
        DEMO_USER.age,
        product_id,
        qty,
        unit_price,
        channel,
        ts
    FROM DEMO_USER_ORDERS
    LEFT JOIN DEMO_USER ON DEMO_USER_ORDERS.user_id = DEMO_USER.user_id
EMIT CHANGES;

The merged stream looks like this:

USER_ID

NAME

AGE

PRODUCT_ID

QTY

UNIT_PRICE

CHANNEL

4a313eba

???

???

1249

8

0.13

partner

4e99c1dd

???

???

1997

5

78.85

partner

8862a117

???

???

1584

1

9.08

store

1f827762

Leo Robertson

56

1497

4

93.78

partner

24085bdc

???

???

1585

9

89.31

store

How to address the partition assignment mismatches

Ideally, all producers, regardless of the client language, should have the same hashing function and partitioner. Luckily, for librdkafka-based languages there is a configuration property called partitioner. To match it with Java’s it should be set to murmur2_random. For example, when setting the producer the configuration properties should be:

{
	"bootstrap.servers": "...",
	"partitioner": "murmur2_random",
	...
}

Be sure to double-check whether the client library/version you are using exposes the partitioner parameter back to librdkafka, otherwise, it will assume the default value.

The hashing functions available in librdkafka are listed on GitHub. Below is an extract of it:

Property

Default

Description

partitioner

consistent_random

random: random distribution


consistent: CRC32 hash of key (Empty and NULL keys are mapped to a single partition)


consistent_random: CRC32 hash of key (Empty and NULL keys are randomly partitioned)


murmur2: Java producer compatible Murmur2 hash of key (NULL keys are mapped to a single partition)


murmur2_random: Java producer compatible Murmur2 hash of key (NULL keys are randomly partitioned; this is functionally equivalent to the default partitioner in the Java producer)


fnv1a: FNV-1a hash of key (NULL keys are mapped to single partition)


fnv1a_random: FNV-1a hash of key (NULL keys are randomly partitioned)


Type: string

Another option is to repartition the topic using ksqlDB. Basically, write a permanent query to read the original table (DEMO_USER, materialized from the topic demo_user) partitioned with CRC32 and dump it to another table/topic. For example:

CREATE TABLE IF NOT EXISTS DEMO_USER_REPARTITION
WITH (
    KAFKA_TOPIC = 'demo_user_repartition',
    VALUE_FORMAT = 'JSON'
)
AS
SELECT
    user_id, name, age
FROM DEMO_USER
EMIT CHANGES;

Since that is a CREATE TABLE … AS SELECT statement, the messages will be repartitioned using the murmur2 hashing function as ksqlDB is Java based.

If we now merge the stream DEMO_USER_ORDERS with the table DEMO_USER_REPARTITION we will have an exact match on the partition assignment:

CREATE STREAM IF NOT EXISTS DEMO_USER_ORDERS_MERGED_REPARTITION AS
    SELECT
        DEMO_USER_ORDERS.user_id AS user_id,
        DEMO_USER_REPARTITION.name,
        DEMO_USER_REPARTITION.age,
        product_id,
        qty,
        unit_price,
        channel,
        ts
    FROM DEMO_USER_ORDERS
    LEFT JOIN DEMO_USER_REPARTITION ON DEMO_USER_ORDERS.user_id = DEMO_USER_REPARTITION.user_id
EMIT CHANGES;

USER_ID

NAME

AGE

PRODUCT_ID

QTY

UNIT_PRICE

CHANNEL

4a313eba

Destiney Bernal

24

1249

8

0.13

partner

4e99c1dd

Maximo Manning

53

1997

5

78.85

partner

8862a117

Jonathon Montoya

61

1584

1

9.08

store

1f827762

Leo Robertson

56

1497

4

93.78

partner

24085bdc

Sonny Ewing

48

1585

9

89.31

store

One important note: If using the Apache Flink stream processing framework (Java or SQL based), none of that would necessarily be a problem. The reason is that Flink typically repartitions the messages upon ingesting them.

Conclusion

After delving into the causes of inconsistencies between Java and many non-Java producers, we have gained insight into the default partitioning behavior of these applications. It is evident that this disparity can lead to data mismatches and challenges for developers, despite the immense popularity of streaming applications for processing real-time data efficiently. Fortunately, by exploring available options, we can overcome these issues and achieve seamless integration between Java and many non-Java producers, ensuring a more robust and reliable system overall.

  • Italo Nesi is a solutions engineer at Confluent, bringing a wealth of experience in various roles such as software engineer, solutions engineer/architect, pre-sales engineer, full stack developer, IoT developer/architect, and a passionate home automation hobbyist. He possesses a strong penchant for building innovative solutions rather than starting from scratch, leveraging existing tools and technologies to deliver efficient and effective results for the core business. His expertise lies in combining his technical prowess with a practical approach, ensuring optimal outcomes while avoiding unnecessary reinvention of the wheel.

The Confluent Developer Newsletter

Get Apache Kafka and Flink news delivered to your inbox biweekly or read the latest editions on Confluent Developer!

Course: Building Apache Flink Applications in Java

Learn the basics of building Apache Flink applications in Java in our course on Confluent Developer.

Did you like this blog post? Share it now