Confluent
Kafka Connect Cassandra Sink: The Perfect Match
Connecting to Kafka

Kafka Connect Cassandra Sink: The Perfect Match

Andrew Stevenson.

This guest post is written by Andrew Stevenson, CTO at DataMountaineer. Andrew focuses on providing real-time data pipelines allowing reactive decision making, streaming analytics and big data integration. He has extensive experience with Kafka, Flume, Spark, Impala, HBase and Cassandra backed by many years of Data Warehousing experience.

With his team at DataMountaineer, he helps in building out the Stream Processing ecosystem, developing multiple connectors and tooling around Apache Kafka.

Kafka Connect is a scalable and robust framework for streaming data into and out of Apache Kafka, the engine powering modern streaming platforms.

At DataMountaineer, we have worked on many big data and fast data projects across many industries from financial such as High Frequency Traders and Banks, both retail and investment, to utilities and media. In each case, significant time is spent loading data, making these pipelines solid, reliable and robust. While this sounds straight forward, it is often overlooked. Data scientists can’t work their magic if they have no data.

Often these pipelines ended up using multiple components like Flume, Sqoop, Spark JDBC import/export or standalone Java or Scala applications to handle the multitude of data sources, both push and pull based. They grew even more complex to handle Lambda architectures and functional requirements (although we believe in Kappa to co-ordinate with workflow engines). They evolved beyond the simple loading of data and started dealing with business logic.

Kafka Connect

Kafka Connect simplifies all this and allows us to separate out these concerns. Kafka Connect Sources and Sinks act as sensors on the edge of our analytics platform, loading and unloading events as they happen real time. Under the hood they are Kafka consumers and producers with a simple and elegant API that allows developers to focus on moving data to and from Kafka.

Using Kafka Connect, we can keep the ingest/egress layers clean and unpolluted by functional requirements. We leave the business logic, the `T` from `ETL`, to the processing layer leaving Connect decoupled from the applications that consume or produce the data it loads. The `T` is handled by stream processing engines, most notably Kafka Streams, Apache Flink or Spark Streaming.

Apache Cassandra

NoSQL stores are now an indispensable part of any architecture, the SMACK stack (Spark, Mesos, Akka, Cassandra and Kafka) is becoming increasing popular. Its demonstrated scalability and its high write throughput makes it the perfect partner to handle the flood of time series, IoT and industrial scale streams or financial data. This made Apache Cassandra an obvious choice as a Kafka Connect sink.

Joshua Ratha, product owner of the Data Hub and trader at Eneco Energy Trade, says:

“As a utility and trading company we deal with lots of time series. Think of gas and power prices,
steering of assets, forecasting the energy supply and demand of our assets and client portfolio at any given moment of the day. We want to interact with this information stream, to be responsive to what happens in the energy markets and analyze it historically to see patterns, learn and improve. This is why an architecture with both Confluent and Cassandra serves us well. At Eneco we are proud of our contribution to the Cassandra connector in collaboration with Datamountaineer.”

Kafka Connect Cassandra Sink

The DataStax Certified Connector, developed by DataMountaineer, simplifies writing data from Kafka into Cassandra. The connector converts the value from the Kafka Connect SinkRecords to Json. A fail fast thread pool is then used to insert the records asynchronously into Cassandra.

Below is a use case with Confluent Platform, Cassandra Sink and Cassandra all playing a pivotal role in a client’s streaming analytics platform.

cass-arch

DataMountaineer connectors can be downloaded from here.

The sink supports the following features:

Field selection – Kafka topic payload field selection is supported, allowing you to have choose selection of fields or all fields written to Cassandra.
Topic to table routing.
Error policies for handling failures – On Error the Sink will throw a RetriableException telling Connect to redeliver the messages on the next poll.
Secure connections via SSL.

Field selection and topic routing is handled by Kafka Connect Query Language which can be found here.
It allows for routing and mapping using an SQL like syntax, consolidating those configuration details into a single option.

The Cassandra Sink supports the following model for saving data from the Kafka topic:

Why KCQL?

While working on our Sink/Sources, we ended up producing quite complex configuration in order to support the functionality required. Imagine a Sink where you source from different topics and from each topic you want to cherry pick the payload fields or even rename them. Furthermore, you might want the storage structure to be automatically created and/or even evolve or you might add new support for the likes of bucketing. Imagine a Sink with a table which needs to be linked to two different topics and the fields in there need to be aligned with the table column names and the complex configuration involved …or you can just write this:

Using the Cassandra Sink

Ok, enough preaching, let’s use the Cassandra Sink to write some fictional trade data.

Preparation

Kafka Connect Sources and Sinks require configuration. For the Cassandra Sink a typical configuration looks like this:

Create a file with these contents, we’ll need it to tell Kafka Connect to run the Sink later.

The configuration options are hopefully obvious. We define the standard configurations required by Kafka Connect such as the connector class, tasks and topics. Next we provide the KCQL statement, `connect.cassandra.export.route.query`. This tells the Sink to select all fields from the payload of the `orders-topic` and insert it into the table `orders` in the keyspace `demo`. We could have remapped or dropped fields from the payload here.

First we need to download Confluent Platform, so download and follow the Quickstart Guide  here to get Apache Kafka, Zookeeper and the Schema Registry up and running.

Additionally, we need to start Kafka Connect. We will do this in distributed mode which is straight forward but we need the Cassandra Sink on the CLASSPATH. Download the `Stream Reactor,` unpack the archive and add the Sink to the CLASSPATH. Be sure to add the full path.

To start Connect run the following from the location you installed Confluent.

We can use Kafka Connect’s Rest API to confirm that our Sink class is available.

We also need Cassandra so download and install from here or follow these instructions:

Start the CQL shell and set up the keyspace and table, the `csqlsh` can be found in `bin` folder of the Cassandra install.

Writing Data

Now we are ready to post in our Cassandra Sink configuration. To submit tasks to Kafka Connect in distributed mode you post in your config as Json to the rest endpoint. Kafka Connect exposes this by default at port 8083. To make this easier we developed a small command line tool to help interaction with Kafka Connect. It can be found here. We also have a CLI for the Schema Registry. To post in the config download the Connect cli and run the following passing in the configuration file we created earlier.

The Sink should start, then you can check the logs in the terminal you started Kafka Connect in:

The CLI also lets you check for active connectors and their status. The full set of supported commands are `ps|get|rm|create|run|status`.

And you should see some nice ASCII art. Another feature 🙂

Next we need to push in some data! We can use the `kafka-avro-console-producer` tool from the Confluent Platform for this purpose. When writing data, we are good citizens and use Avro so we pass in the Avro schema with the `–property value.schema`.

Now paste the following into the terminal:

Check the logs of Kafka Connect:

Now if we check Cassandra we should see our rows:

Conclusion

The combination of Apache Kafka, Kafka Streams, Kafka Connect and Apache Cassandra provides a powerful real time streaming and analytics platform. Initially, the Cassandra Sink was developed for a trade data source at a Eneco, but has now been successfully deployed to sink Twitter, Reuters and more to Cassandra to feed multiple APIs and Data Scientists.

To see a live demo, check out a quick overview below:

 

Subscribe to the Confluent Blog

Subscribe
Email *
[ssba]

More Articles Like This

syncsort-blog-1
Paige Roberts

Confluent Streaming Platform and Syncsort Data Management: Bringing Big Data to Life

Paige Roberts . .

The following post is a guest blog by Paige Roberts, Product Manager, Syncsort. Paige spent 19 years in the data management industry in a wide variety of roles – programmer, analyst, trainer, ...

ogges20-350x131
Robin Moffatt

Streaming data from Oracle using Oracle GoldenGate and Kafka Connect

Robin Moffatt . .

This is a guest blog from Robin Moffatt. Robin Moffatt is Head of R&D (Europe) at Rittman Mead, and an Oracle ACE. His particular interests are analytics, systems architecture, administration, and ...

want-to-migrate-to-aws-cloud-use-apache-kafka-img
Gwen Shapira

Want to migrate to AWS Cloud? Use Apache Kafka.

Gwen Shapira . .

Amazon’s AWS cloud is doing really well. Doing well to the tune of making $2.57 Billion in Q1 2016. That’s 64% up from Q1 last year. Clearly a lot of ...

Leave a Reply to Andrew Stevenson Cancel reply

Your email address will not be published. Required fields are marked *

Comments

  1. Hi, I have a couple questions regarding the connectors.

    1. Do we have to create a Connector config for each Cassandra table we want to update? For instance, let’s say I have a 1000 tables. Each table is dedicated to a different type of widget. Each widget has similar characteristics, but slightly different data. Do we need to create a connector for each table? If so, how is this managed and how does this scale?

    2. In Cassandra, we often need to model column families based on the business need. We may have 3 tables representing user information. 1 by username, 1 by email and 1 by last name. Would we need 3 connector configs and deploy 3 separate Sink tasks to push data to each table?

    thanks!

    1. I think both questions are similar, can the sink handle multiple topics?

      The sink can handle multiple tables in one sink so one configuration. This is set in the kcql statement `connect.cassandra.export.route.query=INSERT INTO orders SELECT * FROM orders-topic;INSERT INTO positions SELECT * FROM positions` but at present they need to be in the same Cassandra keyspace. This would route events from the trades topic to a Cassandra table called trades and events from positions. You can also select specific columns and rename like select columnA as columnB.

      You may want more than one sink instance for separation of concerns, i.e. isolating the write of a group of topics from other unrelated topics.

      You can scale with the number of tasks the connector is allowed to run, each task starts a Writer for all the target tables.

      We have a support channel of our own for more direct communication. https://datamountaineer.com/contact/

  2. Hi,

    in order to work with the cassandra sink, do I must use the schema registry and avro as message format ?
    In my case I have messages that are serialized as protocol buffers, would I still be able to use the kafka-connect-cassandra-sink ?

    Thanks,

    1. Hi Igor,
      For the current Cassandra Sink version that is the requirement. However we are soon going to upgrade it to support:
      – messages with the Schema set to STRING and the payload is a json string
      – messages without a Schema and the payload is a json string (this is to handle systems already in production where Schema Registry is not used). For this to work you need to make sure your Kafka connect settings are correct. You need for this 3 things set:
      i)key.converter=org.apache.kafka.connect.json.JsonConverter
      ii)value.converter=org.apache.kafka.connect.json.JsonConverter
      iii)schemas.enable=false

      You can make it work with your messages serialized with protobuf if:
      – you would need a converter to translate from your protobuff to either a connect STRUCT or at least a json string. You can see the AvroConverter.java in https://github.com/confluentinc/schema-registry for reference
      – once you have your class you need to set the configuration to use it (see above the settings for JsonConverter)

      We have a support channel of our own for more direct communication. https://datamountaineer.com/contact/

  3. Hi,

    I am getting the following error while trying to read from a kafka topic and writing it to cassandra using the sink connector ERROR Task cassandra-sink-customers-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
    org.apache.kafka.connect.errors.ConnectException: No tables found in Cassandra for topics CUSTOMERS. The keyspace does have CUSTOMERS table in it. What could be the issue then? HELP please

    Thanks,
    Sujiesh

Try Confluent Platform

Download Now