The Connect API in Kafka Cassandra Sink: The Perfect Match
Connecting to Apache Kafka

The Connect API in Kafka Cassandra Sink: The Perfect Match

Andrew Stevenson

This guest post is written by Andrew Stevenson, CTO at DataMountaineer. Andrew focuses on providing real-time data pipelines allowing reactive decision making, streaming analytics and big data integration. He has extensive experience with Kafka, Flume, Spark, Impala, HBase and Cassandra backed by many years of Data Warehousing experience.

With his team at DataMountaineer, he helps in building out the Stream Processing ecosystem, developing multiple connectors and tooling around Apache Kafka.

The Connect API in Kafka is a scalable and robust framework for streaming data into and out of Apache Kafka, the engine powering modern streaming platforms.

At DataMountaineer, we have worked on many big data and fast data projects across many industries from financial such as High Frequency Traders and Banks, both retail and investment, to utilities and media. In each case, significant time is spent loading data, making these pipelines solid, reliable and robust. While this sounds straight forward, it is often overlooked. Data scientists can’t work their magic if they have no data.

Often these pipelines ended up using multiple components like Flume, Sqoop, Spark JDBC import/export or standalone Java or Scala applications to handle the multitude of data sources, both push and pull based. They grew even more complex to handle Lambda architectures and functional requirements (although we believe in Kappa to co-ordinate with workflow engines). They evolved beyond the simple loading of data and started dealing with business logic.

The Connect API in Kafka

The Connect API in Kafka simplifies all this and allows us to separate out these concerns. The Connect API Sources and Sinks act as sensors on the edge of our analytics platform, loading and unloading events as they happen real time. Under the hood they are Kafka consumers and producers with a simple and elegant API that allows developers to focus on moving data to and from Kafka.

Using the Connect API in Kafka, we can keep the ingest/egress layers clean and unpolluted by functional requirements. We leave the business logic, the `T` from `ETL`, to the processing layer leaving the Connect API decoupled from the applications that consume or produce the data it loads. The `T` is handled by stream processing engines, most notably Streams API in Kafka, Apache Flink or Spark Streaming.

Apache Cassandra

NoSQL stores are now an indispensable part of any architecture, the SMACK stack (Spark, Mesos, Akka, Cassandra and Kafka) is becoming increasing popular. Its demonstrated scalability and its high write throughput makes it the perfect partner to handle the flood of time series, IoT and industrial scale streams or financial data. This made Apache Cassandra an obvious choice as a Connect API in Kafka sink.

Joshua Ratha, product owner of the Data Hub and trader at Eneco Energy Trade, says:

“As a utility and trading company we deal with lots of time series. Think of gas and power prices,
steering of assets, forecasting the energy supply and demand of our assets and client portfolio at any given moment of the day. We want to interact with this information stream, to be responsive to what happens in the energy markets and analyze it historically to see patterns, learn and improve. This is why an architecture with both Confluent and Cassandra serves us well. At Eneco we are proud of our contribution to the Cassandra connector in collaboration with Datamountaineer.”

Connect API in Kafka Cassandra Sink

The DataStax Certified Connector, developed by DataMountaineer, simplifies writing data from Kafka into Cassandra. The connector converts the value from the Kafka Connect SinkRecords to Json. A fail fast thread pool is then used to insert the records asynchronously into Cassandra.

Below is a use case with Confluent Platform, Cassandra Sink and Cassandra all playing a pivotal role in a client’s streaming analytics platform.


DataMountaineer connectors can be downloaded from here.

The sink supports the following features:

Field selection – Kafka topic payload field selection is supported, allowing you to have choose selection of fields or all fields written to Cassandra.
Topic to table routing.
Error policies for handling failures – On Error the Sink will throw a RetriableException telling Connect to redeliver the messages on the next poll.
Secure connections via SSL.

Field selection and topic routing is handled by Kafka Connect Query Language which can be found here.
It allows for routing and mapping using an SQL like syntax, consolidating those configuration details into a single option.

The Cassandra Sink supports the following model for saving data from the Kafka topic:


While working on our Sink/Sources, we ended up producing quite complex configuration in order to support the functionality required. Imagine a Sink where you source from different topics and from each topic you want to cherry pick the payload fields or even rename them. Furthermore, you might want the storage structure to be automatically created and/or even evolve or you might add new support for the likes of bucketing. Imagine a Sink with a table which needs to be linked to two different topics and the fields in there need to be aligned with the table column names and the complex configuration involved …or you can just write this:

Using the Cassandra Sink

Ok, enough preaching, let’s use the Cassandra Sink to write some fictional trade data.


Connect API in Kafka Sources and Sinks require configuration. For the Cassandra Sink a typical configuration looks like this:

Create a file with these contents, we’ll need it to tell the Connect API to run the Sink later.

The configuration options are hopefully obvious. We define the standard configurations required by the Connect API in Kafka such as the connector class, tasks and topics. Next we provide the KCQL statement, `connect.cassandra.export.route.query`. This tells the Sink to select all fields from the payload of the `orders-topic` and insert it into the table `orders` in the keyspace `demo`. We could have remapped or dropped fields from the payload here.

First we need to download Confluent Platform, so download and follow the Quickstart Guide here to get Apache Kafka, Zookeeper and the Schema Registry up and running.

Additionally, we need to start the Connect API in Kafka. We will do this in distributed mode which is straight forward but we need the Cassandra Sink on the CLASSPATH. Download the `Stream Reactor,` unpack the archive and add the Sink to the CLASSPATH. Be sure to add the full path.

To start Connect run the following from the location you installed Confluent.

We can use Kafka Connect’s Rest API to confirm that our Sink class is available.

We also need Cassandra so download and install from here or follow these instructions:

Start the CQL shell and set up the keyspace and table, the `csqlsh` can be found in `bin` folder of the Cassandra install.

Writing Data

Now we are ready to post in our Cassandra Sink configuration. To submit tasks to the Connect API in distributed mode you post in your config as Json to the rest endpoint. The Connect API exposes this by default at port 8083. To make this easier we developed a small command line tool to help interaction with the Connect API. It can be found here. We also have a CLI for the Schema Registry. To post in the config download the Connect cli and run the following passing in the configuration file we created earlier.

The Sink should start, then you can check the logs in the terminal you started the Connect API in Kafka in:

The CLI also lets you check for active connectors and their status. The full set of supported commands are `ps|get|rm|create|run|status`.

And you should see some nice ASCII art. Another feature 🙂

Next we need to push in some data! We can use the `kafka-avro-console-producer` tool from the Confluent Platform for this purpose. When writing data, we are good citizens and use Avro so we pass in the Avro schema with the `–property value.schema`.

Now paste the following into the terminal:

Check the logs of the Connect API in Kafka:

Now if we check Cassandra we should see our rows:


The combination of Apache Kafka, Streams API in Kafka, Connect API in Kafka and Apache Cassandra provides a powerful real time streaming and analytics platform. Initially, the Cassandra Sink was developed for a trade data source at a Eneco, but has now been successfully deployed to sink Twitter, Reuters and more to Cassandra to feed multiple APIs and Data Scientists.

To see a live demo, check out a quick overview below:


Subscribe to the Confluent Blog

Email *

More Articles Like This

Robin Moffatt

The Simplest Useful Kafka Connect Data Pipeline In The World … or thereabouts—Part 3

Robin Moffatt . .

We saw in the earlier articles (part 1, part 2) in this series how to use the Kafka Connect API to build out a very simple, but powerful and scalable, streaming ...

Robin Moffatt

The Simplest Useful Kafka Connect Data Pipeline In The World … or thereabouts—Part 2

Robin Moffatt . .

In the previous article in this blog series I showed how easy it is to stream data out of a database into Apache KafkaTM, using the Kafka Connect API. I ...

Robin Moffatt

The Simplest Useful Kafka Connect Data Pipeline In The World … or Thereabouts (Part 1)

Robin Moffatt . .

This short series of articles is going to show you how to stream data from a database (MySQL) into Apache KafkaTM and from Kafka into both a text file and Elasticsearch—all ...

Leave a Reply to Mike Cancel reply

Your email address will not be published. Required fields are marked *


  1. Hi, I have a couple questions regarding the connectors.

    1. Do we have to create a Connector config for each Cassandra table we want to update? For instance, let’s say I have a 1000 tables. Each table is dedicated to a different type of widget. Each widget has similar characteristics, but slightly different data. Do we need to create a connector for each table? If so, how is this managed and how does this scale?

    2. In Cassandra, we often need to model column families based on the business need. We may have 3 tables representing user information. 1 by username, 1 by email and 1 by last name. Would we need 3 connector configs and deploy 3 separate Sink tasks to push data to each table?


    1. I think both questions are similar, can the sink handle multiple topics?

      The sink can handle multiple tables in one sink so one configuration. This is set in the kcql statement `connect.cassandra.export.route.query=INSERT INTO orders SELECT * FROM orders-topic;INSERT INTO positions SELECT * FROM positions` but at present they need to be in the same Cassandra keyspace. This would route events from the trades topic to a Cassandra table called trades and events from positions. You can also select specific columns and rename like select columnA as columnB.

      You may want more than one sink instance for separation of concerns, i.e. isolating the write of a group of topics from other unrelated topics.

      You can scale with the number of tasks the connector is allowed to run, each task starts a Writer for all the target tables.

      We have a support channel of our own for more direct communication.

  2. Hi,

    in order to work with the cassandra sink, do I must use the schema registry and avro as message format ?
    In my case I have messages that are serialized as protocol buffers, would I still be able to use the kafka-connect-cassandra-sink ?


    1. Hi Igor,
      For the current Cassandra Sink version that is the requirement. However we are soon going to upgrade it to support:
      – messages with the Schema set to STRING and the payload is a json string
      – messages without a Schema and the payload is a json string (this is to handle systems already in production where Schema Registry is not used). For this to work you need to make sure your Kafka connect settings are correct. You need for this 3 things set:

      You can make it work with your messages serialized with protobuf if:
      – you would need a converter to translate from your protobuff to either a connect STRUCT or at least a json string. You can see the in for reference
      – once you have your class you need to set the configuration to use it (see above the settings for JsonConverter)

      We have a support channel of our own for more direct communication.

  3. Hi,

    I am getting the following error while trying to read from a kafka topic and writing it to cassandra using the sink connector ERROR Task cassandra-sink-customers-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
    org.apache.kafka.connect.errors.ConnectException: No tables found in Cassandra for topics CUSTOMERS. The keyspace does have CUSTOMERS table in it. What could be the issue then? HELP please


  4. Hi,

    I would like to know if there is an option to update specific column in Cassandra table using Kafka-Cassandra sink.


    1. Hi Siva,

      Cassandra updates based on the primay key column. You can achieve this in the Sink by putting a new message in the Kafka topic with same value for the key column as an existing entry in Cassandra. The Sink isn’t aware it’s an update but Cassandra will overwrite the existing row for that key.

      For support types, you will need to check Cassandras docs as we use the json insert functionality.

      We have a slack channel which makes communication easier.



  5. Hello,
    I am getting the following error while trying the example listed above.
    1. I had to change the following attribute connect.cassandra.export.route.query to connect.cassandra.sink.kcql as Cassandra Sink could not start.
    2. After this change I am getting the following error:
    “Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:29 mismatched input ‘?’ expecting ‘)’ (INSERT INTO demo.orders JSON [?])
    at com.datastax.driver.core.Responses$Error.asException(
    at com.datastax.driver.core.SessionManager$4.apply(
    at com.datastax.driver.core.SessionManager$4.apply(
    at com.datastax.driver.core.Connection$Future.onSet(
    at com.datastax.driver.core.Connection$Future.onSet(
    at com.datastax.driver.core.RequestHandler.setFinalResult(
    at com.datastax.driver.core.RequestHandler.access$2500(
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(
    at io.netty.handler.timeout.IdleStateHandler.channelRead(
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(
    at io.netty.util.concurrent.SingleThreadEventExecutor$”

    I am connected to the following DataStax version of Cassandra as a Cluster:
    [cqlsh 5.0.1 | Cassandra | DSE 4.8.11 | CQL spec 3.2.1 | Native protocol v3]

    I am using the kafka-connect-cli-0.5-all.jar to start the Sink Connector.

    Please advice me where am I going wrong

    1. Hi Siva,

      We have changed the config option names since this post. Our latest documentation can be found at

      For the json insert I believe your Cassandra version doesn’t support it as your on 2.1.17 and it was only introduced in 2.2.6, We are planning on providing a standard insert which should cover earlier releases.



      1. Hello Andrew,
        I have tried out the example provided in this Blog and if you look at the Data sent to the Topic it is a JSON:
        {“id”: 1, “created”: “2016-05-06 13:53:00”, “product”: “OP-DAX-P-20150201-95.7”, “price”: 94.2}

        So can I conclude that a Pre-requisite for trying the example in this Blog needs Cassandra 2.2? Is there any available support/binaries for Cassandra 2.1.x from DataMountaineer?

        Please advise.

        Rakesh V

Try Confluent Platform

Download Now