[Webinar] Deliver enterprise-grade Apache Kafka® to your customers | Join Now

KSQL in Action: Enriching CSV Events with Data from RDBMS into AWS

Written By

Life would be simple if data lived in one place: one single solitary database to rule them all. Anything that needed to be joined to anything could be with a simple SQL JOIN command. But…back in the real world, we have myriad systems scattered across a landscape of log files, NoSQL, RDBMS, CSV, VSAM, IMS and more. What fun, what joy! Long live data silos? Well, no. Not anymore. With Apache Kafka® and KSQL we can integrate data from anywhere, in realtime, and transform it with filters, joins, and aggregations in-flight.

In this article we’ll see how we can take an event and enrich it in realtime. We’ll look up attributes relating to the events from data originating in a database. If you’re from a database or analytics background, we’re basically talking good ole’ denormalization—resolving foreign key relationships.

Our source data is CSV files with order data, and relational data about the customers who have placed these orders. We’ll join these together, and then stream the results to a datastore from where we can run some analyses against them.

Set up Customer Data in MySQL

For our customer data, we’re going to use the excellent Mockaroo to generate some data records with a simple schema.

Whilst Mockaroo can also generate the necessary CREATE TABLE DDL, I’m including it separately here. This is because you need to amend it to declare a Primary Key (PK). If you don’t, the compacted topic will cause Kafka Connect to throw an error Got error produce response with correlation id 2538 on topic-partition asgard.demo.customers-0, retrying (2147483156 attempts left). Error: CORRUPT_MESSAGE.

Run this DDL to create the table in MySQL:

CREATE DATABASE demo;
USE demo;
DROP TABLE customers;
CREATE TABLE customers (
        id INT PRIMARY KEY,
        first_name VARCHAR(50),
        last_name VARCHAR(50),
        email VARCHAR(50),
        gender VARCHAR(50),
        comments VARCHAR(90)
);

Now populate it with the Mockeroo output:

curl -s "https://api.mockaroo.com/api/94201080?count=1000&key=ff7856d0" | mysql demo -uroot

(this assumes that you’ve signed up for a Mockaroo account to get access to the API; if not just download the .sql file from Mockaroo’s web UI and pipe it into MySQL instead)

Check the data has been created in MySQL:

mysql> select * from demo.customers limit 1;
+----+------------+-----------+----------------------------+--------+-----------------------------+
| id | first_name | last_name | email                      | gender | comments                    |
+----+------------+-----------+----------------------------+--------+-----------------------------+
|  1 | Kania      | Eggleson  | keggleson0@tripadvisor.com | Female | Multi-channelled web-enable |
+----+------------+-----------+----------------------------+--------+-----------------------------+
1 row in set (0.00 sec)

mysql> select count() from demo.customers; +----------+ | count() | +----------+ | 1000 | +----------+ 1 row in set (0.00 sec)

mysql>

Stream customer data from MySQL to Kafka

We’re going to use Debezium to stream the contents of the customers table, plus any changes as they occur, from MySQL into Kafka. Instead of a normal Kafka topic with standard retention policies, we’re going to use a compacted topic. We want to work with customer information not a stream of events per se, but instead as the value (name, email, etc) per key(in this case, customer id).

Create the topic, and set it as compacted:

$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic asgard.demo.customers
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "asgard.demo.customers".
$
$ kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name asgard.demo.customers --alter --add-config cleanup.policy=compact
Completed Updating config for entity: topic 'asgard.demo.customers'.

(The topic naming is set by Debezium, and is based on <server>.<database>.<table>)

Now set up the Kafka Connect Debezium connector. This will stream the current contents of the table into Kafka, and then use MySQL’s binlog to track any subsequent changes to the table’s data (including deletes) and stream those to Kafka too.

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d '{
      "name": "mysql-demo-customers",
      "config": {
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "database.hostname": "localhost",
            "database.port": "3306",
            "database.user": "debezium",
            "database.password": "dbz",
            "database.server.id": "42",
            "database.server.name": "asgard",
            "table.whitelist": "demo.customers",
            "database.history.kafka.bootstrap.servers": "localhost:9092",
            "database.history.kafka.topic": "dbhistory.demo" ,
            "include.schema.changes": "true",
            "transforms": "unwrap,InsertTopic,InsertSourceDetails",
            "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
            "transforms.InsertTopic.type":"org.apache.kafka.connect.transforms.InsertField$Value",
            "transforms.InsertTopic.topic.field":"messagetopic",
            "transforms.InsertSourceDetails.type":"org.apache.kafka.connect.transforms.InsertField$Value",
            "transforms.InsertSourceDetails.static.field":"messagesource",
            "transforms.InsertSourceDetails.static.value":"Debezium CDC from MySQL on asgard"
       }
    }'

Check the status of the connector:

$ curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
mysql-connector  |  RUNNING  |  RUNNING

If it’s not RUNNING then check the Connect stdout for errors.

Now let’s check that the topic has been populated:

$ kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic asgard.demo.customers \
--from-beginning --max-messages 1| jq '.'
{
  "id": 1,
  "first_name": {
    "string": "Kania"
  },
  "last_name": {
    "string": "Eggleson"
  },
  "email": {
    "string": "keggleson0@tripadvisor.com"
  },
  "gender": {
    "string": "Female"
  },
  "comments": {
    "string": "Multi-channelled web-enabled ability"
  },
  "messagetopic": {
    "string": "asgard.demo.customers"
  },
  "messagesource": {
    "string": "Debezium CDC from MySQL on asgard"
  }
}

We can also see the Change-Data-Capture (CDC) in action. Run the kafka-avro-console-consumer command from above, but without the --max-messages 1—you’ll get the contents of the topic, and then it will sit there, waiting for new messages. In a separate terminal, connect to MySQL and make a change to the data:

mysql> update demo.customers set first_name='Bob2' where id = 1;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1  Changed: 1  Warnings: 0

mysql>

You should see almost instantaneously the changed record appear in the Kafka topic.

Streaming CSV data into Kafka

Data comes in all sorts of shapes and sizes, from a few bytes of Avro to hundreds of megabytes of XML files, and more. And for better or worse, CSV files are still used as a common interchange format of data, despite a lack of declared schema and difficult to parse field separators. But the great news about Apache Kafka is that with Kafka Connect you can integrate data from lots of sources—even CSV!

To stream CSV data into Kafka we can use the kafka-connect-spooldir plugin. This monitors a folder for a given pattern of file, and processes them, streaming the rows into Kafka and applying the supplied schema. It supports CSV, TSV, and JSON data. Note that it is only suitable for files that are complete; if the file is still being written to by another process then it wouldn’t be appropriate to use this.

To get the plugin simply clone the GitHub repository and build the code. This is done using Maven—download it first if you don’t have it.

git clone https://github.com/jcustenborder/kafka-connect-spooldir.git
cd kafka-connect-spooldir
mvn clean package

Having built the plugin, add it to the Kafka Connect worker’s configuration. Locate the plugin.path configuration entry and add /full/path/to/kafka-connect-spooldir/target/kafka-connect-target/usr/share/kafka-connect/ to the list of paths.

Create three folders: one for the source data, one where the files will be placed once processed, and one for any files in error. Here I’m using /tmp—for real-world use you would amend this to your own path.

mkdir /tmp/source
mkdir /tmp/finished
mkdir /tmp/error

We’re using dummy data, again courtesy of Mockeroo. Download some sample CSV files, call them orders<x>.csv (where <x> is a varying suffix) and place them in the source folder. The folders and contents should look like this:

$ ls -l /tmp/source/ /tmp/finished/ /tmp/error/
/tmp/error/:

/tmp/finished/:

/tmp/source/: total 5224 -rw-r--r--@ 1 Robin staff 440644 9 Feb 22:55 orders.csv -rw-r--r--@ 1 Robin staff 440644 9 Feb 23:37 orders1.csv -rw-r--r--@ 1 Robin staff 440644 9 Feb 23:39 orders2.csv

The great thing about the SpoolDir connector is that is applies a schema to the data, ensuring life is happy for those downstream in Kafka wanting to make use of the data. You can enter the schema by hand (and you do know the schema, right?), or you can use a utility that ships with the plugin to automagically determine it. To do this, create a temporary config file (e.g. /tmp/spool_conf.tmp) as follows:

input.path=/tmp/source
finished.path=/tmp/finished
csv.first.row.as.header=true

Run the utility, specifying the sample input file, and also the field name(s) that is to be the key:

# Run this from the folder in which you built kafka-connect-spooldir
$ export CLASSPATH="$(find target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-spooldir/ -type f -name '*.jar' | tr '\n' ':')"
$ kafka-run-class com.github.jcustenborder.kafka.connect.spooldir.SchemaGenerator -t csv -f /tmp/source/orders.csv -c /tmp/spool_conf.tmp -i order_id
#Configuration was dynamically generated. Please verify before submitting.
#Wed Mar 14 16:01:08 GMT 2018
csv.first.row.as.header=true
value.schema={"name"\:"com.github.jcustenborder.kafka.connect.model.Value","type"\:"STRUCT","isOptional"\:false,"fieldSchemas"\:{"order_id"\:{"type"\:"STRING","isOptional"\:true},"customer_id"\:{"type"\:"STRING","isOptional"\:true},"order_ts"\:{"type"\:"STRING","isOptional"\:true},"product"\:{"type"\:"STRING","isOptional"\:true},"order_total_usd"\:{"type"\:"STRING","isOptional"\:true}}}
input.path=/tmp/source
key.schema={"name"\:"com.github.jcustenborder.kafka.connect.model.Key","type"\:"STRUCT","isOptional"\:false,"fieldSchemas"\:{"order_id"\:{"type"\:"STRING","isOptional"\:true}}}
finished.path=/tmp/finished

Now define the connector. The configuration items are fairly obvious, requiring only a couple of notes:

  • The input.file.pattern is a regex based on the filenames within the specified `input.path` (not the entire pathname as you may assume).
  • The key.schema and value.schema come from the output of the utility above.

    Make sure to escape the quotation marks in the schema correctly in the JSON you send with curl (replace \: with :and replace " with \") (which you can do by piping the above kafka-run-class statement through sedsed 's/\\:/:/g'|sed 's/\"/\\\"/g').

    I’ve also manually changed the STRING to INT64 for the ID columns, and marked all the fields as isOptional=false:

$ curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d '{
  "name": "csv-source-orders",
  "config": {
    "tasks.max": "1",
    "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
    "input.file.pattern": "^orders.*.csv$",
    "input.path": "/tmp/source",
    "finished.path": "/tmp/finished",
    "error.path": "/tmp/error",
    "halt.on.error": "false",
    "topic": "orders",
    "value.schema":"{\"name\":\"com.github.jcustenborder.kafka.connect.model.Value\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{\"order_id\":{\"type\":\"INT64\",\"isOptional\":false},\"customer_id\":{\"type\":\"INT64\",\"isOptional\":false},\"order_ts\":{\"type\":\"STRING\",\"isOptional\":false},\"product\":{\"type\":\"STRING\",\"isOptional\":false},\"order_total_usd\":{\"type\":\"STRING\",\"isOptional\":false}}}",
    "key.schema":"{\"name\":\"com.github.jcustenborder.kafka.connect.model.Key\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{\"order_id\":{\"type\":\"INT64\",\"isOptional\":false}}}",
    "csv.first.row.as.header": "true"
  }
}'

(N.B. this doesn’t handle the timestamp column parsing, instead bringing it in as a VARCHAR)

Check that the connector’s running:

$ curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
csv-source-orders     |  RUNNING  |  RUNNING
mysql-demo-customers  |  RUNNING  |  RUNNING

Assuming all of that has worked, you should see that all the source files are now in finished:

ls -l /tmp/source/ /tmp/finished/ /tmp/error/
/tmp/error/:

/tmp/finished/: total 168 -rw-r--r-- 1 Robin wheel 27834 14 Mar 16:00 orders.csv -rw-r--r-- 1 Robin wheel 27758 14 Mar 16:01 orders1.csv -rw-r--r-- 1 Robin wheel 27894 14 Mar 16:01 orders2.csv

/tmp/source/:

And then the proof is in the pudding: do we have messages on our Kafka topic?

$ kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081 \
    --property print.key=true \
    --topic orders \
    --from-beginning \
    --max-messages 1|jq '.'

{ "order_id": 1 } { "order_id": 1, "customer_id": 827, "order_ts": "2018-02-28T04:40:50Z", "product": "Macaroons - Homestyle Two Bit", "order_total_usd": "3.30" }

Yes, we do! \o/

Using KSQL to Enrich CSV data in Kafka