Confluent
The Simplest Useful Kafka Connect Data Pipeline In The World … or thereabouts—Part 2
Connecting to Apache Kafka

The Simplest Useful Kafka Connect Data Pipeline In The World … or thereabouts—Part 2

Robin Moffatt

In the previous article in this blog series I showed how easy it is to stream data out of a database into Apache Kafka®, using the Kafka Connect API. I used MySQL in my example, but it’s equally applicable to any other database that supports JDBC—which is pretty much all of them! Now we’ll take a look at how we can stream data, such as that brought in from a database, out of Kafka and into Elasticsearch.

Kafka Connect

Again, we’re using Kafka Connect to do this in a scalable, fault-tolerant way, and all with just some simple configuration files!

Here I’m assuming you’ve followed the previous article for the general setup and installation steps. If you haven’t already, start Elasticsearch:

$ elasticsearch

[...]

[2017-08-01T15:22:40,132][INFO ][o.e.h.n.Netty4HttpServerTransport] [-1Kfx0p] publish_address {127.0.0.1:9201}, bound_addresses {[fe80::1]:9201}, {[::1]:9201}, {127.0.0.1:9201}, {127.94.0.2:9201}, {127.94.0.1:9201}

[2017-08-01T15:22:40,132][INFO ][o.e.n.Node               ] [-1Kfx0p] started

Now with further ado, let’s dive in!

Whilst Kafka Connect is part of Apache Kafka itself, if you want to stream data from Kafka to Elasticsearch you’ll want the Confluent Platform (or at least, the Elasticsearch connector).

The configuration is pretty simple. As before, see inline comments for details

It’s worth noting that if you’re using the same convertor throughout your pipelines (Avro, in this case) you’d actually put this in the Connect worker config itself rather than repeating it for each connector configuration.

Load the connector using the Confluent CLI:

./bin/confluent load es-sink-mysql-foobar-01 -d /tmp/kafka-connect-elasticsearch-sink.json

As with the File sink above, as soon as the connector is created and starts running (give it a few seconds to spin up; wait until the Task status is RUNNING) it will load the existing contents of the topic into the specified Elasticsearch index. In the Elasticsearch console you’ll see

[2017-07-10T13:43:27,164][INFO ][o.e.c.m.MetaDataCreateIndexService] [lGXYRzd] [mysql-foobar] creating index, cause [api], templates [], shards [5]/[1], mappings []  
[2017-07-10T13:43:27,276][INFO ][o.e.c.m.MetaDataMappingService] [lGXYRzd] [mysql-foo/7lXMoHxWT6yIvmxIBlMB9A] create_mapping [type.name=kafka-connect]

Querying the data using the Elasticsearch REST API shows that the data is being streamed to it from Kafka:

$ curl -s "http://localhost:9200/mysql-foobar/_search"|jq '.hits'
{
 "total": 5,
 "max_score": 1,
 "hits": [
 {
 "_index": "mysql-foobar",
 "_type": "type.name=kafka-connect",
 "_id": "mysql-foobar+0+4",
 "_score": 1,
 "_source": {
 "c1": 4,
 "c2": "bar",
 "create_ts": 1501797647000,
 "update_ts": 1501797647000
 }
 },
[...]

The eagle-eyed amongst you will notice that the _id of the document has been set by Kafka Connect to the topic/partition/offset, which gives us exactly once delivery courtesy of Elasticsearch’s idempotent writes. Now, this is very useful if we just have a stream of events, but in some cases we want to declare our own key—more on this later.

We can also see that mapping has been created, using the schema of the source MySQL table – another great reason for using the Confluent Schema Registry.

$ curl -s "http://localhost:9200/mysql-foobar/_mappings"|jq '.'
{
 "mysql-foobar": {
 "mappings": {
 "type.name=kafka-connect": {
 "properties": {
 "c1": {
 "type": "integer"
 },
 "c2": {
 "type": "text"
 },
 "create_ts": {
 "type": "date"
 },
 "update_ts": {
 "type": "date"
 }

Using Kibana we can easily see the actual data that’s flowing through from Kafka, with the datatypes preserved and thus the timestamps available for selecting and aggregating our data as we want:

Recap

With a few simple REST calls, we’ve built a scalable data pipeline, streaming data from a relational database through to Elasticsearch, and a flat file. With the Kafka Connect ecosystem we could extend and modify that pipeline to land data to HDFS, BigQuery, S3, Couchbase, MongoDB … the list goes on and on!

Stay tuned for more posts in this series that will take a look at some of the additional cool features available to us in Apache Kafka and Confluent Platform.

Other Posts in this Series:

Part 1: The Simplest Useful Kafka Connect Data Pipeline In The World … or Thereabouts (Part 1)
Part 3: The Simplest Useful Kafka Connect Data Pipeline In The World … or Thereabouts (Part 3)

Subscribe to the Confluent Blog

Subscribe

More Articles Like This

Kafka Connect – DLQ
Robin Moffatt

Kafka Connect Deep Dive – Error Handling and Dead Letter Queues

Robin Moffatt .

Kafka Connect is part of Apache Kafka® and is a powerful framework for building streaming pipelines between Kafka and other technologies. It can be used for streaming data into Kafka ...

Neo4j
Michael Hunger

All About the Kafka Connect Neo4j Sink Plugin

Michael Hunger .

Only a little more than one month after the first release, we are happy to announce another milestone for our Kafka integration. Today, you can grab the Kafka Connect Neo4j ...

JDBC connector for Kafka Connect
Robin Moffatt

Kafka Connect Deep Dive – JDBC Source Connector

Robin Moffatt .

One of the most common integrations that people want to do with Apache Kafka® is getting data in from a database. That is because relational databases are a rich source ...

Leave a Reply

Your email address will not be published. Required fields are marked *

Comments

  1. Hello,

    Is it supported for the ES URL above to be https://myurl: ? In my environment ops team has removed http access to ES, only https is allowed. The documentation says secured ES is not yet supported. By that I was not sure if you are saying “basic auth is not supported” OR “https is not supported”. I don’t need basic auth support, but I do need to hit a https endpoint.

    Thanks

  2. Hello

    First of all thanks for the post! very useful

    I´m indexing a kafka topic , which is in Json format so i transformed to Avro with ksql, in elasticsearch but when my kafka topic is indexed my date field(timestamp bigint) is not recognized as date field in elasticsearch. I tryed define a template in elasticsearch but nothing…
    any help?

    Thanks

Try Confluent Platform

Download Now

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.