Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Kafka Connect Fundamentals: What is Kafka Connect?

Written By

Apache Kafka® is an enormously successful piece of data infrastructure, functioning as the ubiquitous distributed log underlying the modern enterprise. It is scalable, available as a managed service, and has simple APIs available in pretty much any language you want. But as much as Kafka does a good job as the central nervous system of your company’s data, there are so many systems that are not Kafka that you still have to talk to. Writing bespoke data integration code for each one of those systems would have you writing the same boilerplate and unextracted framework code over and over again. Which is another way of saying that if the Kafka ecosystem didn’t already have Kafka Connect, we would have to invent it.

Kafka Connect is the pluggable, declarative data integration framework for Kafka. It connects data sinks and sources to Kafka, letting the rest of the ecosystem do what it does so well with topics full of events. As is the case with any piece of infrastructure, there are a few essentials you’ll want to know before you sit down to use it, namely setup and configuration, deployment, error handling, troubleshooting, its API, and monitoring. Confluent Developer recently released a Kafka Connect course covering these topics, and in each of the sections below, I’d like to share something about the content of each lesson in the course.

Introduction to Kafka Connect

Kafka Connect makes it easy to stream from numerous sources into Kafka and from Kafka into numerous sources, with hundreds of available connectors. Perhaps you are thinking about writing your own code for the functionality that Kafka Connect provides: keep in mind that for starters you’ll need to compensate for failures and restarts, logging, running across multiple nodes, and serialization. Kafka Connect provides these out of the box, and in addition to preparing data for your various systems, Kafka Connect can perform light transformations on your data.

A common Kafka Connect use case is orchestrating real-time streams of events from a data source to a target for analytics. By having Kafka sit between the systems, the total system becomes loosely coupled, meaning that you can easily switch out the source or target, or stream to multiple targets, for example. And if the system gets overwhelmed, Kafka can act as a buffer, absorbing the backpressure. Another use case is using change data capture (CDC) to allow your relational technologies to send data through Kafka to technologies such as NoSQL stores, other event-driven platforms, or microservices—letting you unlock static data. In these circumstances, Kafka can serve as a message broker as well as an independent system of record.

Getting Started with Kafka Connect

Gain some initial experience with Kafka Connect by wiring up a data generator to your Kafka cluster in Confluent Cloud. You’ll begin by establishing a topic using the Confluent Cloud UI, then will connect the Datagen mock source connector to your cluster, so that you can send messages to your topic. Finally, you’ll view your message stream in the Confluent Cloud UI.

Running Kafka Connect

Instances of connector plugins translate between external systems and the Kafka Connect framework. They define how source connectors should collect data from a source system and how sink connectors should prepare Kafka data so it is recognized by target systems. Essentially, they save you from writing a lot of utility code. There are hundreds of connector plugins, dozens of which are fully managed connectors for you to run on Confluent Cloud (in turn, you can look for self-managed connectors on Confluent Hub and elsewhere).

Fully managed connectors are easy to configure via UI, CLI or API, feature elastic scaling with no infrastructure to manage, and also have monitoring and troubleshooting features. That being said, yours may not exist yet (or may not be on your particular cloud), or a feature or transformation you need may only come in a self-managed version.

You can run self-managed Kafka Connect in clusters, whether in containers or on bare metal, with each cluster made up of one or more Connect workers (JVM processes). Of course, self-managed Connect does generally burden you with more responsibilities: potentially altering default settings, manually scaling, and providing your own monitoring.

Connectors, configuration, converters, and transforms

A Kafka Connect process is made up of a series of components: a connector instance, which defines the interaction between Kafka Connect and the external technology in question; a converter, which handles the serialization and deserialization of data, and plays a crucial role in the persistence of schemas; and the optional transformation functionality, which sits between connectors and converters, and can perform one or more transformations to data passing through.

You can also use ksqlDB to add connector instances or you can add them from the Confluent Cloud console. Note that the connector doesn’t write directly to the external source, rather other, under-the-hood, components of Kafka Connect take care of writing data received from the source connector to Kafka topics, as well as reading data from Kafka topics and passing it to sink connectors.

For their part, converters serialize or deserialize in Avro, Protobuf, String, JSON, JSON Schema or ByteArray, and as mentioned above, also maintain schemas. Finally, the optional transformations can alter your data as it flows through Kafka Connect, performing actions like dropping fields, changing field types, or adding metadata.

Use SMTs with a managed connector

In this hands-on exercise, learn how to add single message transforms in the Confluent Cloud UI for your Datagen mock data managed connector. org.apache.Kafka.connect.transforms.cast$Key will let you cast a few of your data fields to a new type before they are written into Kafka and org.apache.Kafka.connect.transforms.TimestampConverter will let you convert message timestamps. Identify your new settings in the JSON configuration for your connector then finally launch the connector to view the actual transformed data.

Using the Confluent Cloud managed connector API

Gain familiarity with the Confluent Cloud Connect API along with the Org API and the Cluster API by executing various REST calls related to Confluent managed connectors running in Confluent Cloud. You’ll begin by encoding two sets of credentials, then you’ll call the Org API to ultimately find your cluster ID. Create a Kafka topic to be the target for a Datagen source connector, then check your available plugins, noting that Datagen is present. Finally, establish an instance of it. Next, set up a downstream MySQL sink connector, which will receive the data produced by your Datagen connector. Once you’ve finished, learn how to inspect the config for a connector, how to pause a connector (verifying that both the connector and task are paused by running a status command), then how to resume the connector and its task.

Using the Confluent Cloud managed connector CLI

In this hands-on exercise, learn how to use Confluent CLI in the context of Kafka Connect managed connectors, by becoming familiar with CLI commands that allow you to create, configure and monitor managed connectors. Begin by setting up some default params for your CLI, then create a Kafka target topic for your Datagen source connector. Verify your topic, then list the fully managed plugins that are available for streaming in your Confluent Cloud environment. Create your Datagen connector, then verify that it is producing into your topic. You’ve made the beginning of your data pipeline, now create your downstream by connecting to your existing MySQL database table (provided for you in a Docker container) via sink connector. Learn about ssl.mode, then verify that your source has written into your sink. Finally, learn how to pause and resume a connector and its task.

Deployment

The requirement to deploy Kafka Connect only applies to self-managed Connect, not Confluent Cloud-based Connect. Adding a connector instance requires you to specify its logical configuration, but it’s physically executed by a thread known as a task. Thus, if a connector supports parallelization, its data ingress or egress throughput can be augmented by adding more tasks. Tasks themselves run on a JVM process known as a worker, whereby each worker can run multiple connector instances. Workers can be run in standalone or distributed mode. In distributed mode, Kafka topics are used to store state related to configuration, connector status, and more, and connector instances are managed using the REST API that Kafka Connect offers. It’s easy to add additional workers to rebalance a workload since they can read metadata from Kafka (the minimum recommended worker number is two, for fault tolerance). The other option is standalone mode, in which the Kafka Connect worker uses local files to store state. You can’t scale for throughput in standalone mode or have fault-tolerant behavior, so distributed mode is the recommended one unless you have a connector that needs to execute with server locality (note that you can also satisfy this requirement with a single worker in distributed mode).

Running Kafka Connect in Docker

Instead of running a Kafka Connect worker as a JVM process, you may prefer to run it using Docker, which can enable you to get up and running more quickly. The Confluent-maintained image cp-kafka-connect provides a basic worker to which you can add your desired JARs, which you can do either at runtime or by adding them to the image itself. The former option increases start-up time and requires a network connection, thus the latter option is usually used for production. A bonus optimization, one that is also demo oriented, is to write your connector instances into a startup script, rather than adding them after the worker is already running. Watch how to do this at the end of the video.

Running a self-managed connector with Confluent Cloud

Although the collection of fully managed connectors on Confluent Cloud is growing consistently, you may find that you need to run a self-managed connector if there isn’t a managed version of the connector you need. To accomplish this in conjunction with a cluster on Confluent Cloud, you’ll have to run your own Connect worker, which you’ll accomplish twofold in this hands-on exercise using local Docker containers. You’ll learn to set up multiple data pipelines: local MySQL using Debezium to stream to a Kafka topic on Confluent Cloud; a Kafka topic on Confluent Cloud streaming to a local Elasticsearch instance; finally a Confluent Kafka topic on Confluent Cloud streaming to a local instance of Neo4j. To begin, you’ll inspect some Docker and other configs, then start your Connect cluster and other Docker images, verifying that your workers are connecting to Confluent Cloud. Finally, you’ll test your pipelines, sending data from MySQL to Confluent Cloud, then sending that same data back to both Elasticsearch and Neo4j, respectively.

Using Kafka Connect’s REST API

In this video, you can learn the main features of Kafka Connect’s REST API, the primary interface to a cluster in distributed mode, by executing easy command-line examples. Begin by learning how to fetch basic cluster info as well as a nice jq-formatted list of plugins installed on a worker. Next, learn to create a connector both normally and idempotently, then list all existing connectors, as well as inspect a given connector’s config—or review its status. After that, learn how to delete a connector using the tool peco as well as how to review connector failure in a task’s stack trace. Then learn to restart a connector and its tasks, to pause and resume a connector, and to display all of a connector’s tasks. Finally, get a list of topics used by a connector.

Error handling and dead letter queues

Errors in Kafka Connect can be handled in several ways, specifically with the “fail fast” or “silently ignore” methods, or by using a dead letter queue. Typical errors in Kafka Connect have to do with serialization: for example, you are attempting to deserialize data that was serialized in another format (“Unknown magic byte!”) or it is arriving in several different formats. In both of these cases, you can set your errors to “fail fast,” which is safe behavior as the messages in question won’t be processed, instead your task will stop.

The other option is to set up a dead letter queue, which is a Kafka topic where the erroring messages can be sent so that you can inspect them or process them in some other fashion. The dead letter queue isn’t enabled by default because not every connector needs one, and before you even set one up, you should have a plan for handling the erroring messages, as it isn’t useful to just send messages to the dead letter queue for the sake of storage.

Conclusion

Now that you have been introduced to Kafka Connect’s internals and features, and a few strategies that you can use with it, the next step is to experiment with establishing and running an actual Kafka Connect deployment. Check out the free Kafka Connect 101 course on Confluent Developer for code-along tutorials addressing each of the topics above, along with in-depth textual guides and links to external resources.

You can also listen to the podcast Intro to Kafka Connect: Core Components and Architecture, or listen to the online talk From Zero to Hero with Kafka Connect to learn more about key design concepts of Kafka Connect. During the meetup, you’ll see a live demo of building pipelines with Kafka Connect for streaming data in from databases and out to targets including Elasticsearch.

Get Started

  • Evan Bates is a technical writer for Confluent, primarily working on content for Confluent Developer as well as white papers. In the past, he worked in a related capacity for an in-memory database company and as a web developer. When not experimenting in the Apache Kafka ecosystem, he enjoys building and maintaining various iOS side projects.

Did you like this blog post? Share it now