Don’t miss out on Current in New Orleans, October 29-30th — save 30% with code PRM-WEB | Register today
I’m a Data Streaming Engineer and a developer advocate, which means I spend a lot of time thinking about the day-to-day experience of building applications with data streaming and stream processing. I muse about a world of data in motion where entire organizations have the governance needed to manage, discover, and understand the complex relationships between data streams.
But we don’t live in an “all streams, all of the time” universe. We encounter data of all stripes—from relational databases, document stores, and object stores to files, APIs, and third-party systems—as both sources and targets of our stream processing ecosystem. For Apache Kafka® users, Kafka Connect provides a runtime with task management as well as a framework for creating connectors to these external systems—many of which have already been implemented by the community or companies like Confluent.
In addition to managed Kafka, Confluent Cloud offers dozens of pre-built, fully managed connectors along with a managed runtime environment. And while the admin console provides a guided experience to create and manage these connectors, many organizations prefer an infrastructure-as-code practice. The Confluent Terraform Provider allows us to create and maintain these assets in source code management (such as git) and execute updates via sound continuous integration and continuous deployment (CI/CD) practices. Seems pretty straightforward, right? Yes … and yet, not so much.
The Confluent Cloud Console can streamline the provisioning of required access control lists (ACLs) and role bindings for the connector-associated Kafka topics as well as any internal topics. Simply provide (or create) a service account, and the rest is taken care of via a single checkbox. In a recent demo with the Terraform Provider, I spent an undisclosed amount of time researching a number of error messages.
In this post, I’d like to pass along the lessons I learned from this experience. We’ll build the required service accounts, with the permissions they need to access data in Kafka. Then we can use those to configure a PostgreSQL Sink Connector that transforms the source data for storage in the database. Let’s get to it.
Here’s the data pipeline we’re building. Data flows into a source topic from some application. In my use case, this source data is the result of a series of Kafka Streams topologies. Those transformations and joins create a state of the data optimized for future web services to provide results to callers from a PostgreSQL database.
There are other assets needed to get there, namely the Kafka ACLs, such that the connector can consume from the source topic. Those ACLs are applied to the service account that runs a PostgreSQL Sink Connector.
The source code associated with this article can be found in my github repository. Consult the README for more information if you want to follow along.
There are two service accounts created here. The first is the connector_admin, which has a role binding that allows it to administer Confluent Cloud assets. In our example, we create this service account with CloudClusterAdmin permissions—and we can debate the merits of least required access, as we should. Check the documentation for more about Role-Based Access Control (RBAC) in Confluent Cloud.
The next service account will be used by connectors to access Kafka topics. The connector account does not necessarily need a role binding applied to it. Given that we’re creating a sink connector, this account needs READ and DESCRIBE access to the source Kafka topic. The managed PostgreSQL Sink Connector also requires a dead letter topic in its configuration, and the connector service account needs access to write erroneous events to this topic.
In Terraform, it looks something like this:
With these permissions in place, let’s provision a PostgreSQL Sink Connector to write data from the source topic to a table. Here is the resource definition in the terraform code. I’ll give you the details on the highlights of this configuration:
The connection information for the PostgreSQL instance is provided as terraform
variables. These include the host (B) and port (C), along with the credentials—password (A) is considered sensitive information, and the database user (D) is provided as well.
We want the (E) schema of the target table to be managed by the schema of the data in the source Kafka topic. After all, we have data governance in our Confluent Cloud environment where we manage the (F) evolution of this data structure as a Protobuf schema, along with any data quality rules as the data is written to the stream. For more on that, see the many resources we’ve published on the topic of shifting left.
Next we see the kafka.auth.mode
(G) value denoting that this connector will use a service account to authenticate to Apache Kafka. The service account in use is the connector
account created in the previous section. We defined that using the kafka.service.account.id
(H) configuration parameter and setting it to the id of the service account.
With the authentication and authorization ironed out, let’s focus on the format of the data and how we want that represented in the target database table. The Data Contract tab of the target topic (parking-row-aggregates) shows us the Protobuf schema:
There are a couple of custom data types referenced here:
And let’s look at those referenced schemas:
But in our database table, we want to “flatten” these complex types and use only the fields we need to service requests. After all, the goal here is to read-optimize this data for the callers of a web service use case. This is the first of a chain of single message transforms (SMTs) that we’ll apply to our source data.
The provided Flatten SMT is applied to the value of each Kafka event to “flatten” (as the name suggests) any nested data structures. We provide a delimiter to separate the name of the top-level field and the name of the field within the structure.
In an example where the source data looks like this…
… the result of the Flatten SMT as configured would look something like this:
That’s a great first step, but we have this mix of camel-case and snake-case names in these new fields. We should settle on a standard, so let’s say snake-case.
We can use the ReplaceField SMT to rename the fields and apply our naming convention:
At first glance, this may seem tedious and error-prone. But I believe using these SMTs preserves the structure of the source data. We could have done transformations upstream, but only if we had access to that codebase—and then the stream would become less of a reusable asset in our organization’s data catalog. There could be other use cases that could leverage that original structure.
You may have noticed that we didn’t rename all of those mixed-case fields. You caught me! Well, not really, because our final SMT is to drop the fields we didn’t rename since we don’t need them in the target database table. The ReplaceField SMT allows us to define a set of fields for exclusion from the events in the resulting stream.
With our final data structure in place, we need to derive the values to be used as the primary key of the target table. We use the pk.mode
and pk.fields
configuration of the PostgreSQL Sink Connector to use the id field of the Kafka event as the primary key.
Given that we’ve configured this connector to create and maintain our table, let’s give the connector a naming convention for the target table.
Using the table.name.format
configuration, we’re writing to a schema named parking
and using the name of the source topic as the name of the target table. And here is the schema of the table we’ve created from our data stream via this connector:
And when we query this table…
…we see the results:
And there we have it: a data pipeline from a Kafka topic, transforming data with a chain of SMTs, sinking each event to a relational database. We manage the service accounts associated with the connector and the permissions needed to various Confluent Cloud assets. All of this is infrastructure-as-code using the Confluent Terraform Provider. We can easily maintain the running state of this connector in multiple environments by submitting a pull request, merging to the main branch, and then leveraging CI/CD pipelines to promote and validate changes along the way.
Check out Confluent Developer, your one-stop learning site for all things data streaming with Apache Kafka and Apache FlinkⓇ. There you’ll find free educational content, articles, videos, and tutorials to help you in your data streaming journey.
I’m always interested in connecting with those who take the time to read my stuff. You can find me on LinkedIn, BlueSky, X/Twitter, and Instagram.
Apache®, Apache Kafka®, Kafka®, Apache Flink®, and Flink® are registered trademarks of the Apache Software Foundation. No endorsement by the Apache Software Foundation is implied by the use of these marks.
Powering analytics and AI requires reliable, consistent, and easily discoverable data to reach the data lake. To enforce these needs, strong and holistic governance is an important of building better platforms for getting from raw data to valuable insights and actions.
Confluent Cloud is introducing TLS 1.3 for stronger security. It’s available now as an opt-in feature for Dedicated clusters. On April 30, 2026, TLS 1.3 will be enabled by default for all remaining cluster types. Confluent will continue to support TLS 1.2.