Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now
Apache Kafka® supports incredibly high throughput. It’s been known for feats like supporting 20 million orders per hour to get COVID tests out to US citizens during the pandemic. Kafka's approach to partitioning topics helps achieve this level of scalability.
Topic partitions are the main "unit of parallelism" in Kafka. What’s a unit of parallelism? It’s like having multiple cashiers in the same store instead of one. Multiple purchases can be made at once, which increases the overall amount of purchases made in the same amount of time (this is like throughput). In this case, the cashier is the unit of parallelism.
In Kafka, each partition leader can live on a different broker in a cluster, and a producer can send multiple messages, each with a different destination topic partition; that is, a producer can send them in parallel. While this is the main reason Kafka enables high throughput, compression can also be a tool to help improve throughput and efficiency by reducing network traffic due to smaller messages. A well-executed compression strategy also means better disk utilization in Kafka, since stored messages on disk are smaller.
Kafka producers are responsible for compressing data by batching data going to the same partition. Typically, it works in the following manner when the producer is responsible for compression, and the topic’s compression.type is producer.
Why the same partition, and not at the topic level? Producers communicate with topic partition leaders. If you compress data for multiple partitions in a single batch, then that batch would have to go to multiple leaders, which would send more data over the wire, making compression not worth the effort.
Brokers always perform some amount of batch decompression in order to validate data. Internally, the LogValidator class implements the validation logic, and message decompression ultimately happens in the relevant CompressionType's wrapForInput implementation. The following scenarios always require full payload decompression:
The producer compresses batches (i.e., compression.type isn’t none), and the topic-level compression.type specifies a different codec or is uncompressed.
You’re running Confluent Server brokers and have enabled Schema Validation on the topic.
If a topic is compacted, the broker will also periodically decompress any compressed batches in order to filter out records eligible for compaction.
Now what size batch should be configured on the producers? (Keep in mind that effective batch size is dictated by two configuration settings: the batch size upper bound batch.size, and the maximum amount of time to allow a batch to fill up linger.ms.) Like many pieces of configuration in Kafka, it depends on the scenario. The advantage of a small batch size is that it saves memory and decreases latency; on the other hand, a large batch size increases throughput but takes up more memory. The compression performance is important from a producer perspective, and the decompression performance is important from a consumer perspective. Let's not forget about the brokers: decompression performance is important to the brokers, and compression performance matters if the broker's topic-level compression.type is different from what the producer used. The performance depends in large part on the compression type.
There are five types of compression available:
none
gzip
snappy
lz4
zstd
none
(or, if you’re setting this piece of config from the topic, uncompressed) means that compression won’t occur. The other settings are different compression algorithms supported by Kafka. In general, lz4 is recommended for performance. gzip is not recommended due to high overhead; if you’re looking for a compression ratio similar to gzip but with less CPU overhead, give zstd a try. Keep in mind that each unique pipeline or application will require testing to determine the best type.
compression.type
An important implementation detail to bear in mind: there are two places to set compression.type—on the topic and on the producer. The topic level compression.type can be configured to defer to the producer via the aptly named producer setting. The rest of the topic-level compression types take precedence over the producer-level compression type and will override it in terms of the codec used to store messages on the broker and send them to consumers. The following table outlines guidance for the commonly employed compression type combinations, as well as scenarios in which the less common combinations are worth considering and testing.
Topic | Producer | Compression type for producer to broker communication | Compression type for storage on broker, and broker to consumer communication | Guidance |
|
| producer's compression.type | producer's compression.type | This is the untuned default, i.e., what you get if you don't specify values. Considered fine for development but not recommended for production unless supported by performance testing. |
|
| producer's compression.type | producer's compression.type | This is a common combination giving responsibility to the producer. |
| (something different than Topic, but not "none") | producer's compression.type | topic's compression.type | In this situation, the broker will have to recompress (using the topic's compression.type). This isn't typically a desired result, but it may apply in certain scenarios,for instance, legacy producers use a different approaches to compression and you want to enforce the newer compression type at a cluster level. |
| (same as Topic) | producer's compression.type | topic's compression.type | Not recommended because if producer's compression.type eventually changes, the broker will override it, which typically isn't desired. Suggest using topic compression.type = producer instead. |
| none | producer's compression.type | topic's compression.type | These combinations are uncommon, but may apply in some edge cases, such as when the producer or consumer is CPU constrained but not network bottlenecked. In the first case, the producer isn't asked to compress, but topics are still compressed when stored or transmitted to the consumer.
Not recommended as your initial configuration, but may be considered only after thorough end-to-end throughput analysis. |
|
| producer's compression.type | topic's compression.type | Same as above. |
|
| producer's compression.type | topic's compression.type | Not recommended because if producer's compression.type changes, the broker will override it and store/transmit data to the consumer uncompressed, which typically isn't desired. Suggest using topic compression.type = producer instead. |
Compressed data must be decompressed as well! As compression generally improves producer throughput, so does compression improve consumer throughput, just at the cost of decompression, which varies according to compression type. Compressed messages are identified by a special header that the consumer "recognizes". It then decompresses any compressed messages it receives, and only returns decompressed messages.
Now, the consumer can handle both compressed and uncompressed messages coming in. The decoupling of the producer and consumer is a major upside to using Kafka. Since the consumer supports this decoupling by handling both types of messages, it can handle messages from producers that send both compressed and uncompressed messages. However, given that compression impacts CPU, network, and disk utilization from producers to brokers to consumers, you will want to coordinate the compression type across your producers to achieve optimal end-to-end performance.
A couple of notes that might help you avoid some mistakes.
First of all, remember that encrypted data should not be compressed; encryption is random so the result generally doesn’t compress well. On the other hand, it’s OK to encrypt data that has been compressed already.
Next, when it comes to default pieces of configuration, it’s always good to double-check that they are as expected in your implementations. Do they match between the client and the original distribution? On that note, the compression type in the header doesn’t go all the way to the consumer, so you can use the DumpLogSegments tool to inspect the files on the broker. The command looks like this:
And the JSON output contains the compression codec used:
The size of the messages that can be possibly compressed within Kafka depends on the specific compression codec employed and the Kafka configuration settings. In general, Kafka allows data messages to be compressed up to a total size of one gigabyte. However, due to factors like measure limitations, including narrowing of network bandwidth, disk space, and the effect on performance, the recommended sizes are generally smaller. Common methods of message compaction are gzip, snappy, and lz4, which are efficient in minimizing message sizes in Kafka clusters, hence minimizing space and time complexity. Compressing parameters should be adjusted based on the real application requirements for message size, the overhead time being added to processing, and system resources which Kafka widely uses.
From the attribute field of Kafka messages, the techniques described how to distinguish between the compressed message and the uncompressed one can be applied in Kafka. Originally, compressed messages usually have a non-null compression. The type attribute of these parameters specifies the codec to be used which could include gzip and snappy among others. As for the received messages, uncompressed messages do not have this attribute that makes them seem like regular plaintext messages. Consumers need to be able to correctly handle and decode messages based on the compression status within Kafka topics, and this requires special distinction.
The number of messages Kafka can deliver per second depends on different elements, such as the nature and setting of the Kafka cluster, together with the capacity of the host hardware, size of messages, and message traffic. Kafka performance is capable of running and submitting throughput numbers counted in thousands to millions per second depending on the management and scaling solutions employed by the user. That is why, when implemented, Kafka is designed for horizontal scaling across multiple brokers and providing tens of thousands of throughput capacities appropriate for various applications, from real-time data processing to data streaming.
If this article sparked your interest and you’re curious to learn more, here are some recommendations:
A Client’s Request: There and Back Again: Learn about producer and consumer configuration in this Kafka Summit London talk from Danica Fine
Batch.size and linger.ms documentation gives the official run-down of these settings
Compression.type: Documentation for the producer
Optimizing and Tuning Confluent Cloud Clients: A guide to optimizing your Confluent Cloud clients, including managing throughput
How To Tune The Apache Kafka Producer Client: Use the producer-perf-test tool to view the throughput and other metrics
It’s not difficult to get started with Apache Kafka®. Learning resources can be found all over the internet, especially on the Confluent Developer site. If you are new to Kafka, […]
Learn what windowing is in Kafka Streams and get comfortable with the differences between the main types.