Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now
Kafka Connect is an open source data integration tool that simplifies the process of streaming data between Apache Kafka® and other systems. Kafka Connect has two types of connectors: source connectors and sink connectors. Source connectors allow you to read data from various sources and write it to Kafka topics. Sink connectors send data from the topics to another endpoint. This blog post discusses how to tune your source connectors to help you get the best throughput out of your compute resources. This article focuses on the following topics:
When it comes to tuning source connectors, you need to have an understanding of how a connector works. To start, let’s walk through an example of a JDBC Source connector which has been split into three sections:
We have an endpoint/source from which we are pulling the records (database in our example), followed by a Kafka Connect which contains the following:
Connector – returns the number of records it has pulled from the source
Converter – converts the records to the chosen data type, e.g., Avro
Transformations – applies simple message transformations (SMT) which were defined in the connector configurations
Producer – sends the records to the Kafka topic
Finally, there’s the Kafka endpoint, which accepts the producer records.
Now that we have covered the basics of Kafka Connect, let’s discuss what can’t be tuned:
Converter – converting a record to Avro/Protobuf/Json will always take a certain amount of time.
Transformations – while a transformation is extremely fast, it will always take a certain amount of time to apply a transformation. If you wish to avoid this delay, you can choose to not use transformations.
For what can be tuned, there are four components:
Connector – we can use the connector configurations to tune further
Producer – certain configurations such as batch.size
can be used to further increase throughput
Source endpoint – not covered as it depends on which endpoint you choose
Kafka endpoint – not covered in this article, however, you can review the following white paper to learn more
Tuning a connector is dependent on what configurations the connector has exposed. If the connector does not expose any configurations for pulling more data from the endpoint, then a connector cannot be tuned past a certain throughput.
For example, Confluent JDBC Source connector has the following configurations exposed by the connector:
batch.max.rows
– maximum number of rows to include in a single batch when polling for new data
poll.interval.ms
– frequency in ms to poll for new data in each table
We can make the connector return more records to the producer by increasing batch.max.rows
. If the connector did not expose this configuration and had a static 5 records as the batch.max.rows
then the largest amount of records the connector can pull on each query would be only 5 records. A connector can only be tuned according to its exposed configuration options.
When it comes to tuning a producer, there are several common configurations to consider:
batch.size
– specifies a maximum batch size in bytes (default 16384)
linger.ms
– specifies a maximum duration to fill the batch in milliseconds (default 0)
buffer.memory
– the total bytes of memory the producer can use to buffer records waiting to be sent to the server (default 33554432)
compression.type
– specifies the final compression type for a given topic (default producer)
The batch.size
configuration should match the number of records returned by the connector. A simple example is if the connector returns 500 records, the batch size should be set using the following equation:
If, for example, the connector returns 500 records every time it pulls from the database, and each record is 1.76 KiB in size, then the equation would be:
(* Multiplied by 1024 to convert between KiB and bytes)
The linger.ms
configuration depends on how many records the connector returns to the producer. The producer may need to wait only 5 ms when there are 10 records, however, when there’s 100K records being returned, the linger.ms
would need to be increased. The reason for the linger.ms
increase is the batch will need more time to fill up. If the linger.ms
configuration is set too low, it will result in many requests where batches do not get sufficient amount of time to fill up. The opposite can occur for linger.ms
where if the value is too high, then the producer waits unnecessarily and batches get sent at a slower rate.
The buffer.memory
configuration represents the total bytes of memory the producer can use to buffer records waiting to be sent to the server. This setting should correspond roughly to the total memory the producer will use, but it is not hard bound, since not all memory the producer uses is used for buffering. If, for example, Kafka producer is not able to send messages (batches) to Kafka broker (let’s say broker is down). The producer will start accumulating the message batches in the buffer memory (default 32 MB). Once the buffer is full, it will wait for max.block.ms
(default 60,000ms) so that the buffer can be cleared out. If the buffer does not get cleared out, then the producer will throw an exception. If the buffer.memory
is set too low, it will fill instantaneously and throw an exception. While the opposite may occur, where too high of a value on the buffer.memory
can cause an OOM exception if the memory on the OS gets used up.
The final configuration is compression.type
, which can be enabled to compress messages before they are produced. There are many pros and cons for each compression type, where you will need to do some personal research on which compression type is best for your use case. The following KIP contains further information on each compression performance. While compression is great for making your messages smaller, enabling performance does add time to message delivery since messages need to be compressed.
For JMX metrics, you should divide them into three sections: connector metrics, broker metrics, and producer metrics (from Kafka Connect framework).
Level | Metric | Description | Why it’s useful? |
---|---|---|---|
Connector |
| Before transformations are applied, this is the average per-second number of records produced or polled by the task belonging to the named source connector in the worker. | Tells us the average per-second number of records produced prior to transformation |
Connector |
| Average time in milliseconds taken by this task to poll for a batch of source records. | Metric that can tell you how long it takes for a record to be returned from the endpoint. |
Connector |
| After transformations are applied, this is the average per-second number of records output from the transformations and written to Kafka for the task belonging to the named source connector in the worker (excludes any records filtered out by the transformations). | Useful when transformations are applied to determine the impact of connect message transformations. |
Broker |
| Byte-in rate from clients. | Helpful to see throughput on a per-topic basis for validating throughput has increased. |
Producer |
| The average record size. | Metric used for calculating the |
Producer |
| The average number of bytes sent per partition per-request. | Validate |
Producer |
| The average number of records per request. | Validate how many records are sent in each producer batch. |
Producer |
| The average number of records sent per second for a topic. | Used as indicator the connector needs to be further tuned. |
This example walks you through how to increase your throughput on a Confluent JDBC Source connector connecting to a MySQL database (DB). This methodology can be applied to any source connector. The following steps need to be taken:
Determine what connector/producer configurations can be modified and set up Grafana for JMX metrics
Gather baseline throughput
Determine which configurations to change (producer vs. connector)
Increase producer configurations
Increase connector configurations for further throughput
JDBC source connector configurations:
batch.max.rows
– maximum number of rows to include in a single batch poll by the connector. This setting can be used to limit the amount of data buffered internally in the connector. (default 100)
poll.interval.ms
– frequency in ms to wait before requesting new data for each table. (default 5000)
Producer configurations:
batch.size
– specifies a maximum batch size in bytes (default 16384)
linger.ms
– specifies a maximum duration to fill the batch in milliseconds (default 0)
buffer.memory
– the total bytes of memory the producer can use to buffer records waiting to be sent to the server (default 33554432)
compression.type
– specifies the final compression type for a given topic (default none)
This blog post uses kafka-docker-playground environment where a mysql.sh
deploys a MySQL instance, inserts 10 million records into the DB, and starts up the connector with Grafana by enabling export ENABLE_JMX_GRAFANA=true
.
First, you need to have a baseline for your throughput using the default connector configurations. This way, you can see a before and after of JMX metrics to determine how to increase my throughput. Do keep in mind when you are tuning, you want to do tuning with a single task. Once the baseline throughput has been determined for a single task, you can always increase the task.max
at a later time frame (if the connector permits it).
The following are my connector configurations:
Baseline Grafana JMX metrics:
Broker
BytesInPerSec - 50.9 MB/s
Connector
source-record-poll-rate - 53.1K per second
poll-batch-avg-time-ms - 0.981 ms
source-record-write-rate - 53.1K per second
Producer
record-size-avg - 1.09 KiB
batch-size-avg - 15.1 KiB
records-per-request-avg - 14.8 avg records per request
record-send-rate - 71.6K per second
In order to determine which configurations to change, let’s summarize our metrics in a simple table:
Metric | Value |
---|---|
| 50.9 MB/s |
| 53.1K ops/s |
| 0.981 ms |
| 53.1K ops/s |
| 1.09 KiB |
| 15.1 KiB |
| 14.8 avg records per request |
| 71.6K avg records per request |
Based on the above metrics, the producer level metrics should be taken into consideration first. The main reason for choosing to tune the producer first is due to JMX metrics batch-size-avg
and records-per-request-avg
. The two values should sound an alarm, as the batch size average is 15.1 KiB and average number of records per request is 14.8. The batch-size-avg
is getting maxed out as the default batch.size
is 16384 and appending another record would not be possible since the average record size is 1.09 KiB. When looking at records-per-request-avg
metric, I am sending on average 14.8 records per request, yet the connector is returning 100 on each request due to batch.max.rows
configuration. This indicates the connector is returning 100 more records, however, the batch.size
is being maxed out with only ~15 records.
I can validate the number of records returned by the connector to the producer by enabling TRACE
level logging for AbstractWorkerSourceTask
:
This is followed by searching in the logs for the following line:
Based on the producer metrics, the first component I will tune will be the producer.
With that said, one might ask, “How do I determine the connector is not the bottleneck?”
When it comes to determining if the connector is the bottleneck, one or both of the following scenarios tend to happen:
Throughput does not increase even though producer configurations have been modified
Producer send-rate
stays flat or does not increase when modifying the producer
When one or both of the above scenarios happen, this is a good indicator the connector needs to be modified. JMX metric send-rate
tells us what the producer is currently doing. If the send-rate
does not increase or stays relatively the same, then this is an indicator the producer is waiting on records from the connector. Meaning, the connector is the bottleneck as the producer is in “waiting” mode.
In order to tune the producer, the batch.size
needs to be increased. The following equation can be used to calculate the batch size:
Since our current batch.size
is full with 100 records coming from the connector, in this exercise, I will increase the batch.size
. and batch.max.rows
to further increase the throughput. The reason to tune batch.max.rows
as well, is more of a personal preference to avoid recalculating the batch.size
a second time. I know I want to pull more data from the endpoint, which is why I’m choosing to do both at once. This modification will also eliminate the connector being the bottleneck, since now we’re pulling more messages. The batch.max.rows
will be set to 500 records, and we can use the above equation to determine the batch.size
. To determine the batch size average, notice the batch-size-avg
is in KiB. This KiB needs to be converter to bytes by multiply 1024
:
Now our connector configuration will be updated, where you should notice the batch.max.rows
and batch.size
has increased. Again the reason is based on the JMX metrics batch.size
since we know our batch is getting filled up with 100 records:
After the connector has started up, the following JMX metrics summary is the end result:
Metric | Value |
---|---|
| 61.0 MB/s |
| 61.5K ops/s |
| 4.41 ms |
| 61.4K ops/s |
| 1.09 KiB |
| 95.3 KiB |
| 95.9 avg records per request |
| 60.8K avg records per request |
Based on the above JMX metrics, notice the records-per-request-avg
is not reaching the number of records the batch.max.rows
is being set to. records-per-request-avg
is an indicator the producer needs to wait a bit longer to fill up the batch. When this scenario occurs, linger.ms
is the configuration that needs to be increased. linger.ms
configuration can vary depending on your environment, however, for this example I will choose a simple small number of 10. The following is the new connector configurations:
Updated JMX metrics summary:
Metric | Value |
---|---|
| 61.0 MB/s |
| 60.8K ops/s |
| 4.53 ms |
| 60.6K ops/s |
| 1.09 KiB |
| 543 KiB |
| 534 avg records per request |
| 58.0K avg records per request |
At this point, the average number of records per request is met, however, the BytesInPerSec
and record-send-rate
have not increased. Those two metrics indicate the issue is the connector not sending the records fast enough to the producer. The producer can send more records, however, it’s not getting the records fast enough.
In Step 4, we tuned the producer and determined the connector is the current bottleneck, as throughput has not increased and record-send-rate remained relatively flat. There’s only two configurations that can help to increase the throughput from the connector side:
batch.max.rows
– maximum number of rows to include in a single batch when polling for new data. This setting can be used to limit the amount of data buffered internally in the connector.
poll.interval.ms
– frequency in ms to poll for new data in each table.
Since the batch.max.rows
was already modified and throughput was not increased, the poll.interval.ms
is left. poll.interval.ms
determines how often to pull data from a new table where the current value is 500. This means the connector is pulling data from the table only every 5 seconds.
The following will be the connectors’ new configurations where poll.interval.ms
is set to 1:
Updated JMX metrics summary:
Metric | Value |
---|---|
| 88.2 MB/s |
| 88.1K ops/s |
| 19.3 ms |
| 87.7K ops/s |
| 1.09 KiB |
| 543 KiB |
| 534 avg records per request |
| 80.3K avg records per request |
At this point, our throughput has been increased 57% from the original 50.9 MB/s to 88.2 MB/s. The main bottleneck for this scenario was the connector pulling records from the endpoint too slowly.
This example went from 50.9 MB/s to 88.2 MB/s which is a 57% increase in throughput. With that said, this testing scenario is a “perfect world” scenario, when in reality production environments are not as consistent because:
Most database tables do not insert 10 million rows with exactly 1.09 KiB in size
The testing was done in one EC2 instance with all Docker containers running locally with almost no latency
The demo DB was not tuned for its full potential
While most scenarios are not as isolated as presented in this example environment, the troubleshooting methodology remains the same:
Determine if the connector or producer is the bottleneck
Tune the producer to ensure batches get filled up
Tune the connector to send more messages at a faster rate to the producer
One of the most common integrations that people want to do with Apache Kafka® is getting data in from a database. That is because relational databases are a rich source […]
Apache Kafka® is an enormously successful piece of data infrastructure, functioning as the ubiquitous distributed log underlying the modern enterprise. It is scalable, available as a managed service, and has […]