How to Build a Data Mesh with Stream Governance | Join Webinar

Apache Kafka Message Compression

Course: Apache Kafka 101

Learn the basics of Apache Kafka in our Confluent Developer course.

The DevX Newsletter

Get Apache Kafka and Flink news delivered to your inbox biweekly or read the latest editions on Confluent Developer!

Écrit par

Producers and data compression 

Apache Kafka¼ supports incredibly high throughput. It’s been known for feats like supporting 20 million orders per hour to get COVID tests out to US citizens during the pandemic. Kafka's approach to partitioning topics helps achieve this level of scalability. 

Topic partitions are the main "unit of parallelism" in Kafka. What’s a unit of parallelism? It’s like having multiple cashiers in the same store instead of one. Multiple purchases can be made at once, which increases the overall amount of purchases made in the same amount of time (this is like throughput). In this case, the cashier is the unit of parallelism. 

In Kafka, each partition leader can live on a different broker in a cluster, and a producer can send multiple messages, each with a different destination topic partition; that is, a producer can send them in parallel. While this is the main reason Kafka enables high throughput, compression can also be a tool to help improve throughput and efficiency by reducing network traffic due to smaller messages. A well-executed compression strategy also means better disk utilization in Kafka, since stored messages on disk are smaller. 

What makes data compression in a Kafka producer work?  

Kafka producers

Kafka producers are responsible for compressing data by batching data going to the same partition. Typically, it works in the following manner when the producer is responsible for compression, and the topic’s compression.type is producer. 

Why the same partition, and not at the topic level? Producers communicate with topic partition leaders. If you compress data for multiple partitions in a single batch, then that batch would have to go to multiple leaders, which would send more data over the wire, making compression not worth the effort.

Brokers always perform some amount of batch decompression in order to validate data. Internally, the LogValidator class implements the validation logic, and message decompression ultimately happens in the relevant CompressionType's wrapForInput implementation. The following scenarios always require full payload decompression:

  • The producer compresses batches (i.e., compression.type isn’t none), and the topic-level compression.type specifies a different codec or is uncompressed.

  • You’re running Confluent Server brokers and have enabled Schema Validation on the topic.

If a topic is compacted, the broker will also periodically decompress any compressed batches in order to filter out records eligible for compaction.

Now what size batch should be configured on the producers? (Keep in mind that effective batch size is dictated by two configuration settings: the batch size upper bound batch.size, and the maximum amount of time to allow a batch to fill up linger.ms.) Like many pieces of configuration in Kafka, it depends on the scenario. The advantage of a small batch size is that it saves memory and decreases latency; on the other hand, a large batch size increases throughput but takes up more memory. The compression performance is important from a producer perspective, and the decompression performance is important from a consumer perspective. Let's not forget about the brokers: decompression performance is important to the brokers, and compression performance matters if the broker's topic-level compression.type is different from what the producer used. The performance depends in large part on the compression type. 

Compression type algorithms

There are five types of compression available:  

  • none

  • gzip

  • snappy

  • lz4

  • zstd

none (or, if you’re setting this piece of config from the topic, uncompressed) means that compression won’t occur. The other settings are different compression algorithms supported by Kafka. In general, lz4 is recommended for performance. gzip is not recommended due to high overhead; if you’re looking for a compression ratio similar to gzip but with less CPU overhead, give zstd a try. Keep in mind that each unique pipeline or application will require testing to determine the best type. 

Configuring the compression.type 

An important implementation detail to bear in mind: there are two places to set compression.type—on the topic and on the producer. The topic level compression.type can be configured to defer to the producer via the aptly named producer setting. The rest of the topic-level compression types take precedence over the producer-level compression type and will override it in terms of the codec used to store messages on the broker and send them to consumers. The following table outlines guidance for the commonly employed compression type combinations, as well as scenarios in which the less common combinations are worth considering and testing.

Topic compression.type

Producer compression.type

Compression type for producer to broker communication

Compression type for storage on broker, and broker to consumer communication

Guidance

producer

none

producer's compression.type

producer's compression.type

This is the untuned default, i.e., what you get if you don't specify values. Considered fine for development but not recommended for production unless supported by performance testing.

producer

gzip, snappy, lz4, zstd

producer's compression.type

producer's compression.type

This is a common combination giving responsibility to the producer.

gzip, snappy, lz4, zstd

(something different than Topic, but not "none")

producer's compression.type

topic's compression.type

In this situation, the broker will have to recompress (using the topic's compression.type). This isn't typically a desired result, but it may apply in certain scenarios,for instance, legacy producers use a different approaches to compression and you want to enforce the newer compression type at a cluster level.

gzip, snappy, lz4, zstd

(same as Topic)

producer's compression.type

topic's compression.type

Not recommended because if producer's compression.type eventually changes, the broker will override it, which typically isn't desired. Suggest using topic compression.type = producer instead.

gzip, snappy, lz4, zstd

none

producer's compression.type

topic's compression.type

These combinations are uncommon, but may apply in some edge cases, such as when the producer or consumer is CPU constrained but not network bottlenecked. In the first case, the producer isn't asked to compress, but topics are still compressed when stored or transmitted to the consumer.

Not recommended as your initial configuration, but may be considered only after thorough end-to-end throughput analysis.

uncompressed

gzip, snappy, lz4, zstd

producer's compression.type

topic's compression.type

Same as above.

uncompressed

none

producer's compression.type

topic's compression.type

Not recommended because if producer's compression.type changes, the broker will override it and store/transmit data to the consumer uncompressed, which typically isn't desired. Suggest using topic compression.type = producer instead.

Kafka consumers

Compressed data must be decompressed as well! As compression generally improves producer throughput, so does compression improve consumer throughput,  just at the cost of decompression, which varies according to compression type. Compressed messages are identified by a special header that the consumer "recognizes". It then decompresses any compressed messages it receives, and only returns decompressed messages. 

Now, the consumer can handle both compressed and uncompressed messages coming in. The decoupling of the producer and consumer is a major upside to using Kafka. Since the consumer supports this decoupling by handling both types of messages, it can handle messages from producers that send both compressed and uncompressed messages. However, given that compression impacts CPU, network, and disk utilization from producers to brokers to consumers, you will want to coordinate the compression type across your producers to achieve optimal end-to-end performance.

Before you go
 

A couple of notes that might help you avoid some mistakes. 

First of all, remember that encrypted data should not be compressed; encryption is random so the result generally doesn’t compress well. On the other hand, it’s OK to encrypt data that has been compressed already. 

Next, when it comes to default pieces of configuration, it’s always good to double-check that they are as expected in your implementations. Do they match between the client and the original distribution? On that note, the compression type in the header doesn’t go all the way to the consumer, so you can use the DumpLogSegments tool to inspect the files on the broker. The command looks like this:  

kafka-run-class kafka.tools.DumpLogSegments --files /path/to/log/file --print-data-log

And the JSON output contains the compression codec used: 

compresscodec: <CODEC>

Resources

If this article sparked your interest and you’re curious to learn more, here are some recommendations: 

Course: Apache Kafka 101

Learn the basics of Apache Kafka in our Confluent Developer course.

The DevX Newsletter

Get Apache Kafka and Flink news delivered to your inbox biweekly or read the latest editions on Confluent Developer!

Avez-vous aimé cet article de blog ? Partagez-le !