The Simplest Useful Kafka Connect Data Pipeline In The World … or thereabouts—Part 3
Connecting to Apache Kafka

The Simplest Useful Kafka Connect Data Pipeline In The World … or thereabouts—Part 3

Robin Moffatt

We saw in the earlier articles (part 1, part 2) in this series how to use the Kafka Connect API to build out a very simple, but powerful and scalable, streaming data pipeline. The example we built streamed data from a database such as MySQL into Apache Kafka® and then from Apache Kafka downstream to sinks such as flat file and Elasticsearch. Now we will take a look at one of the very awesome features recently added to Kafka Connect — Single Message Transforms.

Single Message Transforms (SMT) is a functionality within Kafka Connect that enables the transformation … of single messages. Clever naming, right?! Anything that’s more complex, such as aggregating or joins streams of data should be done with Kafka Streams — but simple transformations can be done within Kafka Connect itself, without needing a single line of code.

SMTs are applied to messages as they flow through Kafka Connect; inbound it modifies the message before it hits Kafka, outbound and the message in Kafka remains untouched but the data landed downstream is modified.

The full syntax for SMTs can be found here.

Setting the Key for Data from JDBC Source Connector

This is a nice simple, but powerful, example of SMT in action. Where data is coming from the JDBC Source Connector, as in the previous example, it will have a null key by default (regardless of any keys defined in the source database). It can be useful to apply a key, for example to support a designed partitioning scheme in Kafka, or to ensure that downstream the logical key of the data is persisted in the target store (for example, Elasticsearch).

Let’s remind ourselves what the messages look like that are flowing into Kafka from the Kafka Connect JDBC connector:

./bin/kafka-avro-console-consumer \
                                   --bootstrap-server localhost:9092 \
                                   --property schema.registry.url=http://localhost:8081 \
                                   --property print.key=true \
                                   --from-beginning \
                                   --topic mysql-foobar

null    {"c1":{"int":1},"c2":{"string":"foo"},"create_ts":1501796305000,"update_ts":1501796305000}
null    {"c1":{"int":2},"c2":{"string":"foo"},"create_ts":1501796665000,"update_ts":1501796665000}

Those nulls at the beginning of the line are the keys of the messages. We want our keys to contain the integer value of the c1 column, so we will chain two SMTs together. The first will copy the message value’s c1 field into the message key, and the second will extract just the integer portion of that field. SMTs are always set up per connector, so for our connector we amend the config file (or create a new version):

Apply this configuration change:

./bin/confluent config jdbc_source_mysql_foobar_01 -d /tmp/kafka-connect-jdbc-source-with-smt.json

The connector will automagically switch to the new configuration, and if you then insert some new rows in the foobar MySQL table (here using a direct pipe instead of interactive session):

echo "insert into foobar (c1,c2) values (100,'bar');"|mysql --user=rmoff --password=pw demo

you’ll see in the avro console consumer:

100     {"c1":{"int":100},"c2":{"string":"bar"},"create_ts":1501799535000,"update_ts":1501799535000}

Note that the key (the first value on the line) matches the value of c1, which is what we’ve defined in the transforms stanza of the configuration as the column from which to take the message key from.

We can use the fact that we now have a key on the messages in Kafka to maintain a static view of our source table in Elasticsearch. This is because Elasticsearch writes in an idempotent manner based on the key of the row (“document”, in Elasticsearch terminology). All that’s necessary for this is to use the SMT to ensure there’s a key on the Kafka message, and in the Elasticsearch sink connector set "key.ignore": "false" (or simply remove the configuration line, since false is the default).

Note that if you’ve got null-keyed messages on your Kafka topic, which you will have if you’ve followed all of the previous examples, you’ll want to restart the source connector with a different topic as its target. If you try to write to Elasticsearch with a topic that has null-keyed messages in and the connector not set to "key.ignore": "true" then the sink task will fail with the pretty self-explanatory error: ConnectException: Key is used as document id and can not be null.

Add Metadata to the Apache Kafka Message for Data Lineage

Here are two more SMTs that can be useful.

  • The first (InsertTopic) embeds the topic name into the message itself—very useful if you have complex pipelines and you want to preserve the lineage of a message and its source topic
  • The second (InsertSourceDetails) adds custom static information as a field into the message. In this example, it’s the name of the source database. Other uses could be environment (dev/test/prod) names, geographies of distributed systems, and so on
"transforms.InsertSourceDetails.static.value":"MySQL demo on asgard"

Note that thanks to the wonders of schema evolution and the Confluent Schema Registry, these new columns are automagically added into the schema, and reflected downstream in Elasticsearch—without a single change having to be made. Literally, the pipeline was running the whole time that I made these changes, and the columns just got added to the Elasticseach mapping. Pretty smart!

Kafka Connect loading Elasticsearch with Schema Evolution

Field Masking and Whitelist/Blacklists

SMTs offer the capability to mask out the value of a field, or drop it entirely.

Here’s an example of a field masking configuration:


This masks field c2, transforming the original message as seen here:


and removing the contents of c2:


We can also drop the field entirely:


With the result:


N.B. If you’re using org.apache.kafka.connect.transforms.ReplaceField make sure you include the $Key or $Value suffix, otherwise you’ll get the error:

org.apache.kafka.common.config.ConfigException: Invalid value class 
org.apache.kafka.connect.transforms.ReplaceField for configuration [...] 
Error getting config definition from Transformation: null

Message Routing with Kafka Connect SMTs

It’s pretty nifty being able to modify and enrich data inbound through Kafka Connect with a few extra lines of configuration. Here’s some useful examples where SMTs help make the outbound section of the datapipe more flexible, by routing messages to different topics.

Topic Routing with Regular Expressions

Often Kafka Topics are (a) used to define the target object name in a data store (such as Elasticsearch), and (b) have various prefixes and suffixes to denote things like data centers, environments, and so on. This means that unless you want your Elasticsearch index (or whatever else) to have a name DC1-TEST-FOO instead of FOO, you have to manually remap it in the configuration. And why do things manually, if they can be done automagically?


So using the example from before, our topic is called mysql-foobar, and let’s say we want to drop the mysql- prefix. With the Elasticsearch connector, we could use the configuration, but this requires us to manually specify every before/after topic permutation, which is going to be pretty tedious (and error prone).

Use this updated configuration for the Elasticsearch connector, with the SMT appended:


From here we can verify that in Elasticsearch the data is now routing to the foobar index:

curl -s "http://localhost:9200/foobar/_search" | jq '.hits.hits[]'
 "_index": "foobar",
 "_type": "",
 "_id": "100",
 "_score": 1,
 "_source": {
 "c1": 100,
 "c2": "bar",
 "create_ts": 1501799535000,
 "update_ts": 1501799535000,
 "messagetopic": "mysql-smt-foobar",
 "messagesource": "MySQL demo on asgard"

Note that with the Elasticsearch sink you may hit ConnectException: Cannot create mapping – see  this GitHub issue for details. If you hit this then as a workaround you can prime Elasticsearch by explicitly creating the index first:

curl -XPUT 'http://localhost:9200/foobar'

Timestamp Topic Routing

As well as remapping a topic based on a regex pattern, SMT can be used to route messages to topics (and thus downstream objects such as Elasticsearch indices) based on a timestamp.

"transforms": "ts-to-topic",  
"transforms.ts-to-topic.type": "org.apache.kafka.connect.transforms.TimestampRouter"

There are two optional configuration options – topic.format and timestamp.format, letting you specify the topic pattern and timestamp format to use. For example:



Both routing transforms can be combined, giving:



In this series so far we’ve seen how to build realtime, scalable, fault-tolerant, distributed data pipelines with Kafka and Kafka Connect. The first post looked at streaming data from a database into Kafka. Streaming data out to Elasticsearch from Kafka was then covered next.

In this post we dug a little deeper into Kafka Connect, seeing how Single Message Transforms provide a powerful way to manipulate messages as they flow through Kafka Connect. Masking data, adding lineage, and routing topics are all possible with SMTs. Something we didn’t even cover here is that it’s an open API and so they’re completely extensible too! For more detail on when SMTs are appropriate, have a look at this excellent presentation from Kafka Summit in New York earlier this year.

Other Posts in this Series:

Part 1: The Simplest Useful Kafka Connect Data Pipeline In The World … or Thereabouts (Part 1)
Part 2: The Simplest Useful Kafka Connect Data Pipeline In The World … or Thereabouts (Part 2)

Subscribe to the Confluent Blog


More Articles Like This

Security Camera
Erik-Berndt Scheper

Bust the Burglars – Machine Learning with TensorFlow and Apache Kafka

Erik-Berndt Scheper .

Have you ever realized that, according to the latest FBI report, more than 80% of all crimes are property crimes, such as burglaries? And that the FBI clearance figures indicate ...

Figure 2. Scaling indexing
Pere Urbón-Bayes

Building a Scalable Search Architecture

Pere Urbón-Bayes .

Software projects of all sizes and complexities have a common challenge: building a scalable solution for search. Who has never seen an application use RDBMS SQL statements to run searches? ...

KSQL Tutorial Components
Mark Plascencia

How to Connect KSQL to Confluent Cloud using Kubernetes with Helm

Mark Plascencia .

Confluent Cloud, a fully managed event cloud-native streaming service that extends the value of Apache Kafka®, is simple, resilient, secure, and performant, allowing you to focus on what is important—building ...

Leave a Reply

Your email address will not be published. Required fields are marked *


  1. First, @Robin, thank you so much for putting this article together. I can’t express how frustrated I’ve been with what I thought was a very vanilla use case of Database –> Kafka –> Kafka Streams –> Kafka.

    The documentation on the JDBC Source connector should be much more explicit about the fact that you’re getting records without any defined keys. This wasn’t pointed out anywhere, and if not for my StackOverflow question I would have not found this article! That said, I’ve done what’s described in this article and it was tremendously helpful, but I’m still getting a type mismatch when trying to use my topic data in Kafka Streams. I’ve even used another SMT to force cast the Key to an Int64 (Java long) and it doesn’t look like it has any real impact. Here’s a link to a series of screenshots I put together that shows what I’ve done…if you have any tips, I’d be grateful!

      1. Thank you Robin,

        I’ll give that article a read as well. For what it’s worth, some of my pain with the type mismatch has to do with how Avro stores longs behind the scenes…
        “int and long values are written using variable-length zig-zag coding”

        Thank you again, I’ll reply back and let you know if that other article gets me further down the path.

        And yes, I found the Slack group and those folks have been very helpful. Hopefully I’ll put this relatively simple use case to rest very soon!

  2. Excellent article Robin! It complements pretty well the little Kafka documentation for Transformations by providing real and commonly required use cases where they could be applied.

    I’m sure it will be very useful for our projects. Thanks!

  3. Hi Robin, thanks for posting.

    I’m looking to see if it’s possible to provide a dynamic value (e.g. current host name of the connect consumer) to a message using the InsertInValue transform?


      1. "transforms":"InsertTopic,InsertSourceDetails",
        "transforms.InsertSourceDetails.static.value":"MySQL demo on asgard"

        Is there a way to have dynamic value for insource?

        Means lets say i have 5 databases in my database server. So can i have dynamic database value?

  4. Hi Robin
    I’m trying to whitelist a set of fields for Elasticsearch, but I’m having trouble with the structure of the message.

    The message looks like this:


    I want to whitelist Field1 from the Data struct but I can’t work out how.
    I can use Flatten to get Data.Field1, but I also get Field2 and Field3 as well.
    I’ve tried using whitelist with Field1, and with Data.Field1 but this gives me no data at all in my index.

    transforms=Flatten, Replace
    #or transforms.Replace.whitelist=Data.Field1

    Am I missing something?

    1. The ReplaceField Single Message Transform doesn’t support nested data.

      One option would be to use KSQL to filter the topic written from Kafka Connect to extract the fields that you require into a new topic, for consumption downstream.

  5. Elastic search able to index for a single field, but not for multiple fields when request sent through kafkaconnect.

    Getting exception “Key is used as document id and can not be null” in elastic search.

    My Connector Configurations:

    "name": "test-connector33",
    "config": {
    "tasks.max": "1",
    "topics": "test-connector33",
    "connection.url": "http://localhost:9200",
    "": "aggregator",
    "schema.ignore": "true",
    "topic.schema.ignore": "true",
    "topic.key.ignore": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "name": "test-connector33",
    "transforms": "InsertKey,extractKey",

    I am submitting the below data to topic:

    echo "{ \"customerId\" : \"Jishnu1534465795885\", \"city\" : \"fremont\", \"name\" : \"Jishnu\", \"age\" : 31, \"address\" : [ { \"addressId\" : \"21534465795884\", \"city\" : \"Dallas\", \"state\" : \"TX\" }, { \"addressId\" : \"11534465795884\", \"city\" : \"Detroit\", \"state\" : \"MI\" } ] }"| ./kafka-console-producer --broker-list localhost:9092 --topic test-connector33

    Any idea how to resolve this?

    1. Since you have "key.ignore":"false", Kafka Connect will try to use the Kafka message’s key as the Elasticsearch document id. The error you’re getting indicates that there is not a valid key to use.

      At a guess, the SMT you have isn’t working – I’m pretty sure you can’t have multiple columns in ValueToKey.

      You could validate this by (a) using a single column in the SMT instead of two and also (b) setting a key in the echo that you’re using for testing, and not an SMT in the Kafka Connect – at least then you can see if the lack of a valid key is the issue. kafkacat is a similar tool to kafka-console-producer but I prefer it for its relative functionality and power. Using it you can define the key in a message you send to a topic.

      It’s worth noting that you can also use KSQL to create composite keys for topics if required and the SMT is not able to do it.

      Check out for any more help – there are #ksql and #connect channels both to help!

      1. Robin,

        Thanks a lot. Sorry for late response.
        Sure, I will try all the options you mention.

        Once again thanks a lot.


        1. Hi Robin,

          I am able to resolve the elastic search issue. I am using the kafka message key as the document id. Thanks a lot.

          I am facing another issue when I try to insert record into oracle database. I thought of posting question to slack but it seems it requires approval to include my workspace. So, i am posting my question here:

          The below kafka connect configuration works fine:

          “name”: “teststream”,
          “config”: {
          “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
          “tasks.max”: “1”,
          “topics”: “TESTSTREAM”,
          “connection.url”: “jdbc:oracle:thin:@ipaddress:1521:devbptm”,
          “auto.create”: “false”,
          “key.converter” : “org.apache.kafka.connect.json.JsonConverter”,
          “value.converter” : “org.apache.kafka.connect.json.JsonConverter”,
          “key.converter.schemas.enable” : “false”,
          “value.converter.schemas.enable” : “true”,
          “” : “TESTSTREAM”,
          “pk.mode” : “record_value”,
          “pk.fields” : “id”,
          “insert.mode” : “insert”,
          “auto.evolve” : “false”,
          “name”: “teststream”

          Here is my sample data:

          echo “{ \”schema\”: { \”type\”: \”struct\”, \”fields\”: [{ \”type\”: \”string\”, \”optional\”: false, \”field\”: \”id\”, \”default\”: \”default\” }, { \”type\”: \”string\”, \”optional\”: true, \”field\”: \”msg\” } ], \”optional\”: false, \”name\”: \”some\” }, \”payload\”: { \”id\”: \”default\”, \”msg\”: \”hi\” } }”| ./kafka-console-producer –broker-list localhost:9092 –topic TESTSTREAM

          What configuration change do I need to do in order to insert kafka message key as the primary key in the table?
          Can you please do let me know the sample data to send to topic?

  6. Hi Robin,
    Great article. I have a usecase where data comes in from an IBM system, and I need to validate the datatypes for all the values and throw an error if any of the fields fail the datatype validation. Would I be able to do that with SMTs?
    Would appreciate your response.


    1. In principle, you could use an Single Message Transform to do this. I’m not sure if you could use it to route bad messages, or just fail a pipeline if errors were found. An alternative pattern to consider would be using a Kafka Streams application to do the validation post-ingest and write validated messages to a new topic from which consuming applications could read.

      1. Thank you for great article. I am trying to use RegexRouter to convert resulted dropped topic name to lowercase with \L as it is created with uppercase such as “ogg-MY_TABLE-avro”, trying various variations of regex / replacement, no luck so far. Any advice?

        “transforms”: “dropPrefix,routeTS”,
        “transforms.dropPrefix.type”: “org.apache.kafka.connect.transforms.RegexRouter”,


  7. Thanks for the article, Robin!

    We are using IbmMQ connector ( which has imposes schema and payload is being represented as a string (even though we are posting JSON). What is the best way to convert the actual payload back to JSON before applying transformation like ReplaceField.
    The scenario we have is to remove certain fields from the message before publishing it to the topic.

  8. Hey Robin. Thanks for the article.
    The only thing I cannot understand and find nowhere in documentation – is there a way to apply specific SMT (let’s say “my_smt”) to messages from some specified topics only? Not to messages from all topics, nut from specified only?
    Thank you.

    1. SMT are configured per-connector, and generally, one connector will be handling one topic only. If you need finer control than this then you’d probably be looking at building a stream processing app with Kafka Streams or KSQL.

Try Confluent Platform

Download Now

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.