Apache Kafka is widely used to enable a number of data intensive operations from collecting log data for analysis to acting as a storage layer for large scale real-time stream processing applications. In some of the largest deployments, Kafka powers on the order of hundreds of billions of messages per day, amounting to moving hundreds of terabytes of data. The ability to compress data is, therefore, paramount to Kafka. In this blog post, I outline how compression works in Kafka and how we improved its efficiency to optimize performance.
A quick overview of compression in Kafka
In Kafka compression, multiple messages are bundled and compressed. Then the compressed messages are turned into a special kind of message and appended to Kafka’s log file. The reason to compress a batch of messages, rather than individual messages, is to increase compression efficiency, i.e., compressors work better with bigger data.
More details about Kafka compression can be found in this blog post.
There are tradeoffs with enabling compression that should be considered. Compression, of course, saves space needed for data storage. On the other hand, it consumes extra computing resources, namely CPU cycles and memory, to perform compression. The use of compression should be therefore decided with consideration of the balance of cost and benefit. To make the compression applicable to wider usages of Kafka, we set out to improve Kafka compression support by reducing the cost.
Lots of byte copies!
The cost reduction comes from two places. One is a compressor and the other is the handling of compressed data. We didn’t change the compressors themselves because we would prefer to keep them as black boxes for Kafka. Instead, we tackled the handling of compressed data after we had noticed that there were a significant number of byte copies happening when a compression is turned on — byte copies which caused a noticeable load on Kafka servers.
Before examining each byte copy, let’s take a look at how a Kafka message is formatted.
The CRC field is a hash value of everything after the CRC field. As its name suggests, we computed it using the CRC-32 algorithm. The attributes field is a fixed length field that has bit flags that Kafka internally uses. The contents of the message key and the message body are variable length sequence of bytes. They are prefixed with the length fields.
As described earlier, the compressed messages are turned into a special message. This special message has a compression flag on in the attributes to be distinguished from regular messages. The compressed messages are stored in message-content of the special message as a single chunk of bytes.
Now, let’s review why we had so many byte copies.
When formatting the special message we must compress the bundle of messages first. Otherwise, we wouldn’t be able to write message-length because it is not possible to predict the compressed size precisely before performing the compression. Similarly, we cannot write CRC before we know the value of message-content, compressed messages, because we need to know its value to compute CRC. The compression therefore has to be done first.
In the previous implementation, we used ByteArrayOutputStream to receive and hold the compressed data from the compressor. After compression was done, we created a byte array of its content using the ByteArrayOutputStream.toByteArray method, before we formatted the message. The message was then written to an IO buffer. How many times was a byte copied from the compressor to the IO buffer?
- compressor => ByteArrayOutputStream
- ByteArrayOutputStream => a byte array
- a byte array => Message (ByteBuffer)
- Message => IO buffer (ByteBuffer)
At least four times with the possibility of one more copy. The extra copy might come from the internal buffer management of ByteArrayOutputStream. ByteArrayOutputStream is a doubling buffer. It doubles the size of its internal buffer every time it runs out of space. It creates a new byte array twice as big and copies the entire buffered content over to the new byte array. The benefit of doubling the buffer is that the average number of copies per byte, due to buffer expansions, will not exceed one regardless of the number of buffer expansions. Hence, the total number of byte copies are four or five.
Copying data four or five times is a lot. In general, there is a large amount of data flow in and out of a Kafka server. Copying entire data five times in a server was not ideal and there was definitely room for improvement.
How we removed the extra byte copies
In the new implementation, we wrote a new output stream class named BufferingOutputStream, which works as a variable length buffer like ByteArrayOutputStream. It was aimed for eliminating a number of copies in a few places: formatting, buffer content access and buffer expansion.
#1 Formatting. BufferingOutputStream can create a nested output stream by calling BufferingOutputStream.reserve(int). The number of bytes specified by the argument are skipped from the current position. Basically it creates a hole in a byte stream. Write calls after the reserve call is continued immediately after skipped bytes. The method returns a new output stream which allows data to be written to the skipped bytes later.
The reserve method is used to allocate spaces for CRC and message-length. After compressed data is written, CRC and message-length are written to the internal buffer of BufferingOutputStream through output streams returned by the reserve method.
#2 Buffer content access. BufferingOutputStream does not have the toByteArray method. Instead, it has writeTo(ByteBuffer). It is not necessary to create a byte array object just to copy the buffer content to ByteBuffer. BufferingOutputStream.writeTo() directly writes the content to ByteBuffer and makes a byte array creation unnecessary.
#3 Buffer expansion. The internal buffer of BufferingOutputStream is not implemented as a single byte array but a list of byte arrays. Whenever the buffer space is exhausted, it allocates a new byte array and appends it to the list, so nocopying occurs. In addition to avoiding copying, this structure has a favorable effect on JVM memory management, whereas doubling buffer sometimes causes GC issues. Doubling buffer requires continuous free memory for a new buffer. JVM memory manager has to run a garbage collection when it fails to find a large enough continuous free space. It is most troublesome when many growing doubling buffers coexist in a system and compete for memory resources.
Now, how many times is a byte copied from the compressor to the IO buffer?
- compressor => BufferingOutputStream
- BufferingOutputStream => IO buffer (ByteBuffer)
Each byte is copied only twice as opposed to previous four or five times!
On my laptop, I tested the performance using a test program, kafka.TestLinearWriteSpeed, using Snappy compression. This is not an end-to-end performance test, but a kind of component benchmarking which measures the message writing performance. Previously the throughput was 26.65 MB/sec. With the change it is now 35.78 MB/sec. The improvement is about 34% better throughput. Just removing extra byte copies resulted in 34% improvement was more than I expected. One thing not covered by this benchmark was the effect on GC. In addition to the saving in CPU, we should see better GC behavior in a real production system, especially a large deployment.
We are happy with the result and hope to see more users try the compression feature.