Don’t miss out on Current in New Orleans, October 29-30th — save 30% with code PRM-WEB | Register today

Lessons Learned With Confluent-Managed Connectors and Terraform

Written By

I’m a Data Streaming Engineer and a developer advocate, which means I spend a lot of time thinking about the day-to-day experience of building applications with data streaming and stream processing. I muse about a world of data in motion where entire organizations have the governance needed to manage, discover, and understand the complex relationships between data streams.

But we don’t live in an “all streams, all of the time” universe. We encounter data of all stripes—from relational databases, document stores, and object stores to files, APIs, and third-party systems—as both sources and targets of our stream processing ecosystem. For Apache Kafka® users, Kafka Connect provides a runtime with task management as well as a framework for creating connectors to these external systems—many of which have already been implemented by the community or companies like Confluent.

In addition to managed Kafka, Confluent Cloud offers dozens of pre-built, fully managed connectors along with a managed runtime environment. And while the admin console provides a guided experience to create and manage these connectors, many organizations prefer an infrastructure-as-code practice. The Confluent Terraform Provider allows us to create and maintain these assets in source code management (such as git) and execute updates via sound continuous integration and continuous deployment (CI/CD) practices. Seems pretty straightforward, right? Yes … and yet, not so much.

The Confluent Cloud Console can streamline the provisioning of required access control lists (ACLs) and role bindings for the connector-associated Kafka topics as well as any internal topics. Simply provide (or create) a service account, and the rest is taken care of via a single checkbox. In a recent demo with the Terraform Provider, I spent an undisclosed amount of time researching a number of error messages.

In this post, I’d like to pass along the lessons I learned from this experience. We’ll build the required service accounts, with the permissions they need to access data in Kafka. Then we can use those to configure a PostgreSQL Sink Connector that transforms the source data for storage in the database. Let’s get to it.

The Data Pipeline We’re Trying to Build With Terraform

Here’s the data pipeline we’re building. Data flows into a source topic from some application. In my use case, this source data is the result of a series of Kafka Streams topologies. Those transformations and joins create a state of the data optimized for future web services to provide results to callers from a PostgreSQL database.

This data pipeline moves data from a Kafka Streams application to a Kafka topic, from which it’s streamed to PostgreSQL via a sink connector.

There are other assets needed to get there, namely the Kafka ACLs, such that the connector can consume from the source topic. Those ACLs are applied to the service account that runs a PostgreSQL Sink Connector.

The source code associated with this article can be found in my github repository. Consult the README for more information if you want to follow along.

Service Accounts, Access Control Lists, and Role Bindings

There are two service accounts created here. The first is the connector_admin, which has a role binding that allows it to administer Confluent Cloud assets. In our example, we create this service account with CloudClusterAdmin permissions—and we can debate the merits of least required access, as we should. Check the documentation for more about Role-Based Access Control (RBAC) in Confluent Cloud.

resource "confluent_service_account" "connector_admin" {
  display_name = "connector-admin-sa"
  description  = "Service Account for Kafka Connectors"
}

resource "confluent_role_binding" "connector_admin" {
  principal   = "User:${confluent_service_account.connector_admin.id}"
  role_name   = "CloudClusterAdmin"
  crn_pattern = data.confluent_kafka_cluster.kafka_cluster.rbac_crn
}

# Kafka API Key for connector_admin to manage ACLs and topics
resource "confluent_api_key" "connector_admin" {
  display_name = "connector-admin-kafka-api-key"
  description  = "Kafka API Key owned by connector_admin SA"
  owner {
    id          = confluent_service_account.connector_admin.id
    api_version = confluent_service_account.connector_admin.api_version
    kind        = confluent_service_account.connector_admin.kind
  }

  managed_resource {
    id          = data.confluent_kafka_cluster.kafka_cluster.id
    api_version = data.confluent_kafka_cluster.kafka_cluster.api_version
    kind        = data.confluent_kafka_cluster.kafka_cluster.kind

    environment { id = data.confluent_environment.cc_env.id }
  }

  depends_on = [confluent_role_binding.connector_admin]
}

The next service account will be used by connectors to access Kafka topics. The connector account does not necessarily need a role binding applied to it. Given that we’re creating a sink connector, this account needs READ and DESCRIBE access to the source Kafka topic. The managed PostgreSQL Sink Connector also requires a dead letter topic in its configuration, and the connector service account needs access to write erroneous events to this topic.

In Terraform, it looks something like this:

resource "confluent_service_account" "connector" {
  display_name = "connectors-sa"
  description  = "Service Account for Kafka Connectors"
}

resource "confluent_kafka_acl" "source_topic_row_aggregates_read" {
  kafka_cluster { id = data.confluent_kafka_cluster.kafka_cluster.id }
  resource_type = "TOPIC"
  resource_name = var.SOURCE_TOPIC
  pattern_type  = "LITERAL"
  principal     = "User:${confluent_service_account.connector.id}"
  operation     = "READ"
  permission    = "ALLOW"
  host          = "*"
  rest_endpoint = data.confluent_kafka_cluster.kafka_cluster.rest_endpoint
  credentials {
    key    = confluent_api_key.connector_admin.id
    secret = confluent_api_key.connector_admin.secret
  }

  depends_on = [
    confluent_api_key.connector_admin
  ]
}

resource "confluent_kafka_acl" "source_topic_row_aggregates_describe" {
  kafka_cluster { id = data.confluent_kafka_cluster.kafka_cluster.id }
  resource_type = "TOPIC"
  resource_name = var.SOURCE_TOPIC
  pattern_type  = "LITERAL"
  principal     = "User:${confluent_service_account.connector.id}"
  operation     = "DESCRIBE"
  permission    = "ALLOW"
  host          = "*"
  rest_endpoint = data.confluent_kafka_cluster.kafka_cluster.rest_endpoint
  credentials {
    key    = confluent_api_key.connector_admin.id
    secret = confluent_api_key.connector_admin.secret
  }

  depends_on = [
    confluent_api_key.connector_admin
  ]
}

# DLQ topic ACLs (WRITE/DESCRIBE)
resource "confluent_kafka_acl" "dlq_write" {
  kafka_cluster { id = data.confluent_kafka_cluster.kafka_cluster.id }
  resource_type = "TOPIC"
  resource_name = confluent_kafka_topic.postgres_sink_aggregates_dlq.topic_name
  pattern_type  = "LITERAL"
  principal     = "User:${confluent_service_account.connector.id}"
  operation     = "WRITE"
  permission    = "ALLOW"
  host          = "*"
  rest_endpoint = data.confluent_kafka_cluster.kafka_cluster.rest_endpoint
  credentials {
    key    = confluent_api_key.connector_admin.id
    secret = confluent_api_key.connector_admin.secret
  }

  depends_on = [
    confluent_api_key.connector_admin,
    confluent_kafka_topic.postgres_sink_aggregates_dlq
  ]
}

resource "confluent_kafka_acl" "dlq_describe" {
  kafka_cluster { id = data.confluent_kafka_cluster.kafka_cluster.id }
  resource_type = "TOPIC"
  resource_name = confluent_kafka_topic.postgres_sink_aggregates_dlq.topic_name
  pattern_type  = "LITERAL"
  principal     = "User:${confluent_service_account.connector.id}"
  operation     = "DESCRIBE"
  permission    = "ALLOW"
  host          = "*"
  rest_endpoint = data.confluent_kafka_cluster.kafka_cluster.rest_endpoint
  credentials {
    key    = confluent_api_key.connector_admin.id
    secret = confluent_api_key.connector_admin.secret
  }

  depends_on = [
    confluent_api_key.connector_admin,
    confluent_kafka_topic.postgres_sink_aggregates_dlq
  ]
}

Provisioning a PostgreSQL Sink Connector

With these permissions in place, let’s provision a PostgreSQL Sink Connector to write data from the source topic to a table. Here is the resource definition in the terraform code. I’ll give you the details on the highlights of this configuration:

resource "confluent_connector" "postgres_sink_aggregates" {
  environment {
    id = data.confluent_environment.cc_env.id
  }

  kafka_cluster {
    id = data.confluent_kafka_cluster.kafka_cluster.id
  }

  config_sensitive = {
    "connection.password"             = var.database_password # (A)
  }

  config_nonsensitive = {
    "name"                            = "KS-AggregationsSinkToPostgres"
    "connector.class"                 = "PostgresSink"
    "connection.host"                 = var.postgres_cluster_endpoint # (B)
    "connection.port"                 = var.postgres_cluster_port # (C)
    "connection.user"                 = var.database_user # (D)
    "auto.create"                     = "true"  # (E)
    "auto.evolve"                     = "true"  # (F)
    "cloud.environment"               = "prod"
    "cloud.provider"                  = data.confluent_kafka_cluster.kafka_cluster.cloud
    "db.name"                         = var.postgres_database_name
    "input.data.format"               = "PROTOBUF"
    "input.key.format"                = "STRING"
    "insert.mode"                     = "UPSERT"
    "kafka.auth.mode"                 = "SERVICE_ACCOUNT" # (G) 
    "kafka.service.account.id"        = confluent_service_account.connector.id  # (H)
    "kafka.endpoint"                  = data.confluent_kafka_cluster.kafka_cluster.bootstrap_endpoint
    "kafka.region"                    = data.confluent_kafka_cluster.kafka_cluster.region
    "pk.mode"                         = "record_value"
    "pk.fields"                       = "id"
    "table.name.format"               = "parking.$${topic}"
    "tasks.max"                       = tostring(var.sink_connector_tasks_max)
    "topics"                          = "${var.SOURCE_TOPIC}"
    "transforms"                      = "Flattener,RenameFields,DropUnusedFlattenedFields"
    "transforms.Flattener.type"       = "org.apache.kafka.connect.transforms.Flatten$Value"
    "transforms.Flattener.delimiter"  = "_"
    "transforms.RenameFields.renames" = "garageId:garage_id,zoneId:zone_id,rowId:row_id,carStatus_capacity:car_capacity,carStatus_occupied:car_occupied,handicapStatus_capacity:handicap_capacity,handicapStatus_occupied:handicap_occupied,motorcycleStatus_capacity:motorcycle_capacity,motorcycleStatus_occupied:motorcycle_occupied"
    "transforms.RenameFields.type"    = "io.confluent.connect.transforms.ReplaceField$Value"
    "transforms.DropUnusedFlattenedFields.type"      = "org.apache.kafka.connect.transforms.ReplaceField$Value"
    "transforms.DropUnusedFlattenedFields.exclude"   = "carStatus_vehicleType,handicapStatus_vehicleType,motorcycleStatus_vehicleType"
    # DLQ configuration
    "errors.tolerance"                         = "all"
    "errors.deadletterqueue.topic.name"        = confluent_kafka_topic.postgres_sink_aggregates_dlq.topic_name
    "errors.deadletterqueue.context.headers.enable" = "true"
  }

  depends_on = [
    confluent_service_account.connector,
    confluent_service_account.connector_admin,
    confluent_kafka_acl.connector_consumer_group_read,
    confluent_kafka_acl.source_topic_row_aggregates_describe,
    confluent_kafka_acl.source_topic_row_aggregates_read,
    confluent_kafka_acl.source_topic_zone_aggregates_describe,
    confluent_kafka_acl.source_topic_zone_aggregates_read,
    confluent_kafka_acl.dlq_describe, confluent_kafka_acl.dlq_write,
    confluent_kafka_topic.postgres_sink_aggregates_dlq
  ]
}

From Kafka to the Database

The connection information for the PostgreSQL instance is provided as terraform variables. These include the host (B) and port (C), along with the credentials—password (A) is considered sensitive information, and the database user (D) is provided as well.

We want the (E) schema of the target table to be managed by the schema of the data in the source Kafka topic. After all, we have data governance in our Confluent Cloud environment where we manage the (F) evolution of this data structure as a Protobuf schema, along with any data quality rules as the data is written to the stream. For more on that, see the many resources we’ve published on the topic of shifting left.

Next we see the kafka.auth.mode (G) value denoting that this connector will use a service account to authenticate to Apache Kafka. The service account in use is the connector account created in the previous section. We defined that using the kafka.service.account.id (H) configuration parameter and setting it to the id of the service account.

Data Transformations

With the authentication and authorization ironed out, let’s focus on the format of the data and how we want that represented in the target database table. The Data Contract tab of the target topic (parking-row-aggregates) shows us the Protobuf schema:

There are a couple of custom data types referenced here:

And let’s look at those referenced schemas:

But in our database table, we want to “flatten” these complex types and use only the fields we need to service requests. After all, the goal here is to read-optimize this data for the callers of a web service use case. This is the first of a chain of single message transforms (SMTs) that we’ll apply to our source data.

    "transforms" = "Flattener,RenameFields,DropUnusedFlattenedFields"

The provided Flatten SMT is applied to the value of each Kafka event to “flatten” (as the name suggests) any nested data structures. We provide a delimiter to separate the name of the top-level field and the name of the field within the structure.

    "transforms.Flattener.type"       = "org.apache.kafka.connect.transforms.Flatten$Value"
    "transforms.Flattener.delimiter"  = "_"

In an example where the source data looks like this…

{
  "id": "raleigh-accessible-accessible-zone-standard-zone",
  "garageId": "raleigh-accessible",
  "zoneId": "accessible-zone",
  "rowId": "standard-zone",
  "carStatus": {
    "vehicleType": 0,
    "capacity": 86,
    "occupied": 40
  },
  "handicapStatus": {
    "vehicleType": 1,
    "capacity": 10,
    "occupied": 3
  },
  "motorcycleStatus": {
    "vehicleType": 2,
    "capacity": 4,
    "occupied": 0
  }
}

… the result of the Flatten SMT as configured would look something like this:

{
  "id": "raleigh-accessible-accessible-zone-standard-zone",
  "garageId": "raleigh-accessible",
  "zoneId": "accessible-zone",
  "rowId": "standard-zone",
  "carStatus_vehicleType": 0,
  "carStatus_capacity": 86,
  "carStatus_occupied": 40,
  "handicapStatus_vehicleType": 1,
  "handicapStatus_capacity": 10,
  "handicapStatus_occupied": 3,
  "motorcycleStatus_vehicleType": 2,
  "motorcycleStatus_capacity": 4,
  "motorcycleStatus_occupied": 0
}

That’s a great first step, but we have this mix of camel-case and snake-case names in these new fields. We should settle on a standard, so let’s say snake-case.

"transforms.RenameFields.renames" = "garageId:garage_id,zoneId:zone_id,rowId:row_id,carStatus_capacity:car_capacity,carStatus_occupied:car_occupied,handicapStatus_capacity:handicap_capacity,handicapStatus_occupied:handicap_occupied,motorcycleStatus_capacity:motorcycle_capacity,motorcycleStatus_occupied:motorcycle_occupied"

"transforms.RenameFields.type"    = "io.confluent.connect.transforms.ReplaceField$Value"

We can use the ReplaceField SMT to rename the fields and apply our naming convention:

{
  "id": "raleigh-accessible-accessible-zone-standard-zone",
  "garage_id": "raleigh-accessible",
  "zone_id": "accessible-zone",
  "row_id": "standard-zone",
  "carStatus_vehicleType": 0,
  "car_capacity": 86,
  "car_occupied": 40,
  "handicapStatus_vehicleType": 1,
  "handicap_capacity": 10,
  "handicap_occupied": 3,
  "motorcycleStatus_vehicleType": 2,
  "motorcycle_capacity": 4,
  "motorcycle_occupied": 0
}

At first glance, this may seem tedious and error-prone. But I believe using these SMTs preserves the structure of the source data. We could have done transformations upstream, but only if we had access to that codebase—and then the stream would become less of a reusable asset in our organization’s data catalog. There could be other use cases that could leverage that original structure.

You may have noticed that we didn’t rename all of those mixed-case fields. You caught me! Well, not really, because our final SMT is to drop the fields we didn’t rename since we don’t need them in the target database table. The ReplaceField SMT allows us to define a set of fields for exclusion from the events in the resulting stream.

"transforms.DropUnusedFlattenedFields.type" = "org.apache.kafka.connect.transforms.ReplaceField$Value"

"transforms.DropUnusedFlattenedFields.exclude" =      "carStatus_vehicleType,handicapStatus_vehicleType,motorcycleStatus_vehicleType"
{
  "id": "raleigh-accessible-accessible-zone-standard-zone",
  "garage_id": "raleigh-accessible",
  "zone_id": "accessible-zone",
  "row_id": "standard-zone",
  "car_capacity": 86,
  "car_occupied": 40,
  "handicap_capacity": 10,
  "handicap_occupied": 3,
  "motorcycle_capacity": 4,
  "motorcycle_occupied": 0
}

Define a Primary Key and Table Name

With our final data structure in place, we need to derive the values to be used as the primary key of the target table. We use the pk.mode and pk.fields configuration of the PostgreSQL Sink Connector to use the id field of the Kafka event as the primary key.

"pk.mode"   = "record_value"
"pk.fields" = "id"

Given that we’ve configured this connector to create and maintain our table, let’s give the connector a naming convention for the target table.

"table.name.format" = "parking.$${topic}"

Using the table.name.format configuration, we’re writing to a schema named parking and using the name of the source topic as the name of the target table. And here is the schema of the table we’ve created from our data stream via this connector:

create table parking."parking-row-aggregates"
(
    id                  text not null primary key,
    garage_id           text,
    zone_id             text,
    row_id              text,
    car_capacity        integer,
    car_occupied        integer,
    handicap_capacity   integer,
    handicap_occupied   integer,
    motorcycle_capacity integer,
    motorcycle_occupied integer
);

And when we query this table…

select * from "parking-row-aggregates" where  id = 'raleigh-accessible-accessible-zone-standard-zone';

…we see the results:

Wrapping Up This Intro to Connectors as Code

And there we have it: a data pipeline from a Kafka topic, transforming data with a chain of SMTs, sinking each event to a relational database. We manage the service accounts associated with the connector and the permissions needed to various Confluent Cloud assets. All of this is infrastructure-as-code using the Confluent Terraform Provider. We can easily maintain the running state of this connector in multiple environments by submitting a pull request, merging to the main branch, and then leveraging CI/CD pipelines to promote and validate changes along the way.

Check out Confluent Developer, your one-stop learning site for all things data streaming with Apache Kafka and Apache Flink. There you’ll find free educational content, articles, videos, and tutorials to help you in your data streaming journey.

I’m always interested in connecting with those who take the time to read my stuff. You can find me on LinkedIn, BlueSky, X/Twitter, and Instagram.


Apache®, Apache Kafka®, Kafka®, Apache Flink®, and Flink® are registered trademarks of the Apache Software Foundation. No endorsement by the Apache Software Foundation is implied by the use of these marks.

  • Sandon Jacobs is a Developer Advocate at Confluent, based in Raleigh, NC. Sandon has two decades of experience designing and building applications, primarily with Java and Scala. His data streaming journey began while building data pipelines for real-time bidding on mobile advertising exchanges—and Apache Kafka was the platform to meet that need. Later experiences in television media and the energy sector led his teams to Kafka Streams and Kafka Connect, integrating data from various in-house and vendor sources to build canonical data models.

    Outside of work, Sandon is actively involved in his Indigenous tribal community. He serves on the NC American Indian Heritage Commission, and also as a powwow singer and emcee at many celebrations around North America. Follow Sandon on Twitter @SandonJacobs or Instagram @_sandonjacobs, where he posts about his powwow travels, family, golf, and more.

Did you like this blog post? Share it now