Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

How to Tune Kafka Connect Source Connectors to Optimize Throughput

Written By

Kafka Connect is an open source data integration tool that simplifies the process of streaming data between Apache Kafka® and other systems. Kafka Connect has two types of connectors: source connectors and sink connectors. Source connectors allow you to read data from various sources and write it to Kafka topics. Sink connectors send data from the topics to another endpoint. This blog post discusses how to tune your source connectors to help you get the best throughput out of your compute resources. This article focuses on the following topics:

What can you tune? A high-level overview 

When it comes to tuning source connectors, you need to have an understanding of how a connector works. To start, let’s walk through an example of a JDBC Source connector which has been split into three sections:

We have an endpoint/source from which we are pulling the records (database in our example), followed by a Kafka Connect which contains the following: 

  • Connector – returns the number of records it has pulled from the source 

  • Converter – converts the records to the chosen data type, e.g., Avro 

  • Transformations – applies simple message transformations (SMT) which were defined in the connector configurations 

  • Producer – sends the records to the Kafka topic

Finally, there’s the Kafka endpoint, which accepts the producer records.

Now that we have covered the basics of Kafka Connect, let’s discuss what can’t be tuned: 

  • Converter – converting a record to Avro/Protobuf/Json will always take a certain amount of time. 

  • Transformations – while a transformation is extremely fast, it will always take a certain amount of time to apply a transformation. If you wish to avoid this delay, you can choose to not use transformations.

For what can be tuned, there are four components: 

  • Connector – we can use the connector configurations to tune further 

  • Producer – certain configurations such as batch.size can be used to further increase throughput  

  • Source endpoint – not covered as it depends on which endpoint you choose 

  • Kafka endpoint – not covered in this article, however, you can review the following white paper to learn more

Tuning connectors

Tuning a connector is dependent on what configurations the connector has exposed. If the connector does not expose any configurations for pulling more data from the endpoint, then a connector cannot be tuned past a certain throughput.

For example, Confluent JDBC Source connector has the following configurations exposed by the connector: 

  • batch.max.rows – maximum number of rows to include in a single batch when polling for new data

  • poll.interval.ms – frequency in ms to poll for new data in each table

We can make the connector return more records to the producer by increasing batch.max.rows. If the connector did not expose this configuration and had a static 5 records as the batch.max.rows then the largest amount of records the connector can pull on each query would be only 5 records. A connector can only be tuned according to its exposed configuration options. 

Producer configurations to modify

When it comes to tuning a producer, there are several common configurations to consider:

  • batch.size – specifies a maximum batch size in bytes (default 16384)  

  • linger.ms – specifies a maximum duration to fill the batch in milliseconds (default 0)

  • buffer.memory – the total bytes of memory the producer can use to buffer records waiting to be sent to the server (default 33554432) 

  • compression.type – specifies the final compression type for a given topic (default producer)

The batch.size configuration should match the number of records returned by the connector. A simple example is if the connector returns 500 records, the batch size should be set using the following equation:

batch.size = number_of_records * record_size_average_in_bytes

If, for example, the connector returns 500 records every time it pulls from the database, and each record is 1.76 KiB in size, then the equation would be:

(500*1.76) * 1024 = 901,120

(* Multiplied by 1024 to convert between KiB and bytes) 

The linger.ms configuration depends on how many records the connector returns to the producer. The producer may need to wait only 5 ms when there are 10 records, however, when there’s 100K records being returned, the linger.ms would need to be increased. The reason for the linger.ms increase is the batch will need more time to fill up. If the linger.ms configuration is set too low, it will result in many requests where batches do not get sufficient amount of time to fill up. The opposite can occur for linger.ms where if the value is too high, then the producer waits unnecessarily and batches get sent at a slower rate. 

The buffer.memory configuration represents the total bytes of memory the producer can use to buffer records waiting to be sent to the server. This setting should correspond roughly to the total memory the producer will use, but it is not hard bound, since not all memory the producer uses is used for buffering. If, for example, Kafka producer is not able to send messages (batches) to Kafka broker (let’s say broker is down). The producer will start accumulating the message batches in the buffer memory (default 32 MB). Once the buffer is full, it will wait for max.block.ms (default 60,000ms) so that the buffer can be cleared out. If the buffer does not get cleared out, then the producer will throw an exception. If the buffer.memory is set too low, it will fill instantaneously and throw an exception. While the opposite may occur, where too high of a value on the buffer.memory can cause an OOM exception if the memory on the OS gets used up. 

The final configuration is compression.type, which can be enabled to compress messages before they are produced. There are many pros and cons for each compression type, where you will need to do some personal research on which compression type is best for your use case. The following KIP contains further information on each compression performance. While compression is great for making your messages smaller, enabling performance does add time to message delivery since messages need to be compressed. 

JMX metrics to monitor

For JMX metrics, you should divide them into three sections: connector metrics, broker metrics, and producer metrics (from Kafka Connect framework).

Level

Metric

Description

Why it’s useful?

Connector

source-record-poll-rate

Before transformations are applied, this is the average per-second number of records produced or polled by the task belonging to the named source connector in the worker.

Tells us the average per-second number of records produced prior to transformation

Connector

poll-batch-avg-time-ms

Average time in milliseconds taken by this task to poll for a batch of source records.

Metric that can tell you how long it takes for a record to be returned from the endpoint.

Connector

source-record-write-rate

After transformations are applied, this is the average per-second number of records output from the transformations and written to Kafka for the task belonging to the named source connector in the worker (excludes any records filtered out by the transformations).

Useful when transformations are applied to determine the impact of connect message transformations.

Broker

kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec

Byte-in rate from clients.

Helpful to see throughput on a per-topic basis for validating throughput has increased.

Producer

record-size-avg

The average record size.

Metric used for calculating the batch.size.

Producer

batch-size-avg

The average number of bytes sent per partition per-request.

Validate batch.size has been increased via metric and the producer is getting the expected number of records per request.

Producer

records-per-request-avg

The average number of records per request.

Validate how many records are sent in each producer batch.

Producer

record-send-rate

The average number of records sent per second for a topic.

Used as indicator the connector needs to be further tuned.

Walk-through example

This example walks you through how to increase your throughput on a Confluent JDBC Source connector connecting to a MySQL database (DB). This methodology can be applied to any source connector. The following steps need to be taken: 

  1. Determine what connector/producer configurations can be modified and set up Grafana for JMX metrics 

  2. Gather baseline throughput 

  3. Determine which configurations to change (producer vs. connector) 

  4. Increase producer configurations 

  5. Increase connector configurations for further throughput

Step 1 – Determine what connector/producer configurations can be modified and set up Grafana for JMX metrics

JDBC source connector configurations:

  • batch.max.rows – maximum number of rows to include in a single batch poll by the connector. This setting can be used to limit the amount of data buffered internally in the connector. (default 100)

  • poll.interval.ms – frequency in ms to wait before requesting new data for each table. (default 5000)

Producer configurations:

  • batch.size – specifies a maximum batch size in bytes (default 16384)

  • linger.ms – specifies a maximum duration to fill the batch in milliseconds (default 0)

  • buffer.memory – the total bytes of memory the producer can use to buffer records waiting to be sent to the server (default 33554432)

  • compression.type – specifies the final compression type for a given topic (default none)

This blog post uses kafka-docker-playground environment where a mysql.sh deploys a MySQL instance, inserts 10 million records into the DB, and starts up the connector with Grafana by enabling export ENABLE_JMX_GRAFANA=true.

Step 2 – Gather baseline throughput

First, you need to have a baseline for your throughput using the default connector configurations. This way, you can see a before and after of JMX metrics to determine how to increase my throughput. Do keep in mind when you are tuning, you want to do tuning with a single task. Once the baseline throughput has been determined for a single task, you can always increase the task.max at a later time frame (if the connector permits it).

The following are my connector configurations:

curl -X PUT \
     -H "Content-Type: application/json" \
     --data '{
               "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
               "tasks.max":"1",
               "connection.url":"jdbc:mysql://mysql:3306/mydb?user=user&password=password&useSSL=false",
               "mode":"bulk",
               "producer.override.client.id": "mysql-base",
               "query":"SELECT * FROM mydb.team WHERE mydb.team.id < 7900000 ",
               "topic.prefix":"mysql-base"
          }' \
     http://localhost:8083/connectors/mysql-base/config | jq .

Baseline Grafana JMX metrics:

Broker

BytesInPerSec - 50.9 MB/s

Connector

source-record-poll-rate - 53.1K per second

poll-batch-avg-time-ms - 0.981 ms

source-record-write-rate - 53.1K per second

Producer

record-size-avg - 1.09 KiB

batch-size-avg - 15.1 KiB

records-per-request-avg - 14.8 avg records per request

record-send-rate - 71.6K per second

Step 3 – Determine which configurations to change (producer vs. connector)

In order to determine which configurations to change, let’s summarize our metrics in a simple table:

Metric

Value

BytesInPerSec

50.9 MB/s

source-record-poll-rate

53.1K ops/s

poll-batch-avg-time-ms

0.981 ms

source-record-write-rate

53.1K ops/s

record-size-avg

1.09 KiB

batch-size-avg

15.1 KiB

records-per-request-avg

14.8 avg records per request

record-send-rate

71.6K avg records per request

Based on the above metrics, the producer level metrics should be taken into consideration first. The main reason for choosing to tune the producer first is due to JMX metrics batch-size-avg and records-per-request-avg. The two values should sound an alarm, as the batch size average is 15.1 KiB and average number of records per request is 14.8. The batch-size-avg is getting maxed out as the default batch.size is 16384 and appending another record would not be possible since the average record size is 1.09 KiB. When looking at records-per-request-avg metric, I am sending on average 14.8 records per request, yet the connector is returning 100 on each request due to batch.max.rows configuration. This indicates the connector is returning 100 more records, however, the batch.size is being maxed out with only ~15 records.

I can validate the number of records returned by the connector to the producer by enabling TRACE level logging for AbstractWorkerSourceTask:

curl -s -X PUT -H "Content-Type:application/json" \
    http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.AbstractWorkerSourceTask \
    -d '{"level": "TRACE"}' \
    | jq '.'

This is followed by searching in the logs for the following line:

About to send <integer> records to Kafka

Based on the producer metrics, the first component I will tune will be the producer.

With that said, one might ask, “How do I determine the connector is not the bottleneck?”

When it comes to determining if the connector is the bottleneck, one or both of the following scenarios tend to happen: 

  • Throughput does not increase even though producer configurations have been modified

  • Producer send-rate stays flat or does not increase when modifying the producer

When one or both of the above scenarios happen, this is a good indicator the connector needs to be modified. JMX metric send-rate tells us what the producer is currently doing. If the send-rate does not increase or stays relatively the same, then this is an indicator the producer is waiting on records from the connector. Meaning, the connector is the bottleneck as the producer is in “waiting” mode.

Step 4 – Increasing producer configurations

In order to tune the producer, the batch.size needs to be increased. The following equation can be used to calculate the batch size:

batch.size = number_of_records * record_size_average_in_bytes

Since our current batch.size is full with 100 records coming from the connector, in this exercise, I will increase the batch.size. and batch.max.rows to further increase the throughput. The reason to tune batch.max.rows as well, is more of a personal preference to avoid recalculating the batch.size a second time. I know I want to pull more data from the endpoint, which is why I’m choosing to do both at once. This modification will also eliminate the connector being the bottleneck, since now we’re pulling more messages. The batch.max.rows will be set to 500 records, and we can use the above equation to determine the batch.size. To determine the batch size average, notice the batch-size-avg is in KiB. This KiB needs to be converter to bytes by multiply 1024:

batch.max.rows * record-size-avg * 1024(due to KiB)
500*1.09*1024=558080

Now our connector configuration will be updated, where you should notice the batch.max.rows and batch.size has increased. Again the reason is based on the JMX metrics batch.size since we know our batch is getting filled up with 100 records:

curl -X PUT \
     -H "Content-Type: application/json" \
     --data '{
               "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
               "tasks.max":"1",
               "connection.url":"jdbc:mysql://mysql:3306/mydb?user=user&password=password&useSSL=false",
               "mode":"bulk",
               "batch.max.rows": 500,
               "producer.override.client.id": "mysql-batch",
               "producer.override.batch.size": 558080,
               "query":"SELECT * FROM mydb.team WHERE mydb.team.id < 7900000 ",
               "topic.prefix":"mysql-batch"
          }' \
     http://localhost:8083/connectors/mysql-batch/config | jq .

After the connector has started up, the following JMX metrics summary is the end result:

Metric

Value

BytesInPerSec

61.0 MB/s

source-record-poll-rate

61.5K ops/s

poll-batch-avg-time-ms

4.41 ms

source-record-write-rate

61.4K ops/s

record-size-avg

1.09 KiB

batch-size-avg

95.3 KiB

records-per-request-avg

95.9 avg records per request

record-send-rate

60.8K avg records per request

Based on the above JMX metrics, notice the records-per-request-avg is not reaching the number of records the batch.max.rows is being set to. records-per-request-avg is an indicator the producer needs to wait a bit longer to fill up the batch. When this scenario occurs, linger.ms is the configuration that needs to be increased. linger.ms configuration can vary depending on your environment, however, for this example I will choose a simple small number of 10. The following is the new connector configurations:

curl -X PUT \
     -H "Content-Type: application/json" \
     --data '{
               "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
               "tasks.max":"1",
               "connection.url":"jdbc:mysql://mysql:3306/mydb?user=user&password=password&useSSL=false",
               "mode":"bulk",
               "batch.max.rows": 500,
               "producer.override.client.id": "mysql-batch-linger",
               "producer.override.batch.size": 558080,
               "producer.override.linger.ms": 10,
               "query":"SELECT * FROM mydb.team WHERE mydb.team.id < 7900000 ",
               "topic.prefix":"mysql-batch-linger"
          }' \
     http://localhost:8083/connectors/mysql-batch-linger/config | jq .

Updated JMX metrics summary:

Metric

Value

BytesInPerSec

61.0 MB/s

source-record-poll-rate

60.8K ops/s

poll-batch-avg-time-ms

4.53 ms

source-record-write-rate

60.6K ops/s

record-size-avg

1.09 KiB

batch-size-avg

543 KiB

records-per-request-avg

534 avg records per request

record-send-rate

58.0K avg records per request

At this point, the average number of records per request is met, however, the BytesInPerSec and record-send-rate have not increased. Those two metrics indicate the issue is the connector not sending the records fast enough to the producer. The producer can send more records, however, it’s not getting the records fast enough.

Step 5 – Increasing connector configurations for further throughput

In Step 4, we tuned the producer and determined the connector is the current bottleneck, as throughput has not increased and record-send-rate remained relatively flat. There’s only two configurations that can help to increase the throughput from the connector side: 

  • batch.max.rows – maximum number of rows to include in a single batch when polling for new data. This setting can be used to limit the amount of data buffered internally in the connector. 

  • poll.interval.ms – frequency in ms to poll for new data in each table.

Since the batch.max.rows was already modified and throughput was not increased, the poll.interval.ms is left. poll.interval.ms determines how often to pull data from a new table where the current value is 500. This means the connector is pulling data from the table only every 5 seconds.

The following will be the connectors’ new configurations where poll.interval.ms is set to 1:

curl -X PUT \
     -H "Content-Type: application/json" \
     --data '{
               "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
               "tasks.max":"1",
               "connection.url":"jdbc:mysql://mysql:3306/mydb?user=user&password=password&useSSL=false",
               "mode":"bulk",
               "poll.interval.ms": 1,
               "batch.max.rows": 500,
               "producer.override.client.id": "mysql-batch-linger-poll-interval-1",
               "producer.override.batch.size": 558080,
               "producer.override.linger.ms": 10,
               "query":"SELECT * FROM mydb.team WHERE mydb.team.id < 7900000 ",
               "topic.prefix":"mysql-batch-linger-poll-interval-1"
          }' \
     http://localhost:8083/connectors/mysql-batch-linger-poll-interval-1/config | jq .

Updated JMX metrics summary:

Metric

Value

BytesInPerSec

88.2 MB/s

source-record-poll-rate

88.1K ops/s

poll-batch-avg-time-ms

19.3 ms

source-record-write-rate

87.7K ops/s

record-size-avg

1.09 KiB

batch-size-avg

543 KiB

records-per-request-avg

534 avg records per request

record-send-rate

80.3K avg records per request

At this point, our throughput has been increased 57% from the original 50.9 MB/s to 88.2 MB/s. The main bottleneck for this scenario was the connector pulling records from the endpoint too slowly.

Conclusion

This example went from 50.9 MB/s to 88.2 MB/s which is a 57% increase in throughput. With that said, this testing scenario is a “perfect world” scenario, when in reality production environments are not as consistent because: 

  • Most database tables do not insert 10 million rows with exactly 1.09 KiB in size 

  • The testing was done in one EC2 instance with all Docker containers running locally with almost no latency 

  • The demo DB was not tuned for its full potential

While most scenarios are not as isolated as presented in this example environment, the troubleshooting methodology remains the same: 

  • Determine if the connector or producer is the bottleneck 

  • Tune the producer to ensure batches get filled up 

  • Tune the connector to send more messages at a faster rate to the producer

  • Started off my career as an IBM Software Engineer where I eventually ended up being a Solution Architect/Lead Software Engineer on two different projects. After almost 2.5 years, I decided to join a new startup(Confluent) to learn all about data streaming. This is where I first started getting my exposure to Kafka and Connect. I'm currently the SME for Kafka Connect within the organization and work closely with our PMs/Engineer Leads to help improve the product.

Did you like this blog post? Share it now