Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Spring Into Confluent Cloud With Kotlin—Part 1: Producers and Consumers

Written By

Hey, you! Yeah, you! The puzzled-looking Spring Boot developer, scouring the web for a guide on integrating your microservices with Apache Kafka® on Confluent Cloud with Stream Governance. Admit it, you’ve been Googling nonstop for the past hour and all you’ve found are examples using StringSerializer/StringDeserializer with not even the slightest mention of "schema registry-aware" serialization methods. And I bet the examples you found are implemented in Java. Wouldn’t it be nice to find at least one written in Kotlin (after all, Kotlin is a first-class citizen in the Spring world)? We welcome you. This is a safe space. Let’s get into it.

Spring Boot and Apache Kafka

The Spring Framework and Apache Kafka have been fairly simpatico for a number of years, allowing Spring shops to ease the transition to event-driven microservices. This means your organization’s yearslong investment in building, deploying, testing, and monitoring Spring Boot applications doesn’t have to restart from scratch when moving into the world of data streams.

In this guide we’ll first look at producing events to Apache Kafka topics through the lens of Confluent Cloud, producing events serialized with Apache Avro, and registering schemas with Schema Registry. Then we’ll shift our focus to the consumer side, via the @KafkaListener annotation.

Why Kotlin?

Why not? But seriously, Kotlin is popularly labeled as a JVM language that simplifies the development process for cross-platform projects. This reduces the time spent writing and maintaining code for different platforms while reaping the benefits and flexibility of native programming. Kotlin has gained popularity since its inception with developers for its concise syntax, null safety, and interoperability with Java codebases.

Kotlin is a great fit for server-side applications—as evidenced by its support in popular frameworks such as Spring, Ktor, Quarkus, and Micronaut, just to name a few. Several factors contribute to this adoption—per the Kotlin documentation. The factors that really stand out are the expressive nature of the Kotlin language and coroutines to allow server-side applications to take full advantage of modest hardware allocations. For shops using Kafka, Java interoperability allows you to continue using your favorite Java libraries—which may include Spring Framework libraries.

These features—and more—help to flatten the learning curve of Kotlin for Java developers. If sharpening your Kotlin skills sounds interesting, check out Kotlin Koans.

The setup

Clone the GitHub Confluent demo-scene repository. Our examples are in the spring-into-kafka-cc module. 

Confluent Cloud

You need a Confluent Cloud account to get started. Within the account, let’s create an environment that includes an Apache Kafka cluster and Stream Governance to host our Schema Registry. When it comes to provisioning infrastructure, Terraform is our “love language"—and the Confluent Terraform Provider allows you to quickly and deterministically create the assets you need to get started.

In the spring-into-kafka-cc directory, change to the terraform directory. Here we’ll find the Confluent Terraform Provider defined in main.tf:

# Configure the Confluent Provider
terraform {
  required_providers {
	confluent = {
  	source  = "confluentinc/confluent"
  	version = "1.82.0"
	}
  }
}

provider "confluent" {
}

For more on the Confluent Terraform Provider, see the Confluent Cloud docs.

From the terraform directory in your terminal, let’s initialize Terraform—downloading the providers needed in this project:

terraform init

The Confluent Terraform Provider needs to know the Confluent Cloud Organization ID to which it will apply the changes. To get this value and export it to an environment variable which can be used by Terraform, use this command:

export TF_VAR_org_id=$(confluent organization list -o json | jq -r '.[] | select(.is_current)' | jq '.id')

The command above uses the Confluent CLI to list the organizations to which we belong in JSON format. Then jq is used to query for the "current" organization, getting the id value from the JSON object. This id value is then exported to TF_VAR_org_id—as Terraform treats any environment variables prefixed with TF_VAR_ as input variables. This will satisfy the required value of the variable org_id specified in variables.tf:

variable "org_id" {
  type = string
}

Now we can create and apply a plan to build our Confluent Cloud infrastructure.

terraform plan -out "tfplan"
terraform apply "tfplan"

Once completed, use the Confluent Cloud Console to verify that the specified resources were created. There are also outputs from terraform apply which we’ll need in upcoming steps. Let’s export those to a properties file, again using a little "command-line-fu":

terraform output -json | jq -r 'to_entries[] | .key + "=" + (.value.value | tostring)' | while read -r line ; do echo "$line"; done > ~/tools/spring-into-cc.properties

Here we use terraform output formatted as JSON, then parse the document with jq. For each entry, we create a key-value pair in a properties file in a directory in the current user home tools subdirectory. We’ll use this location later when configuring our Spring Boot application.

The data model

Our example uses Avro-serialized events, generating Java classes from Avro schema documents via a Gradle plugin. Those classes provide us with type-safety and compile-time checks on the data before attempting to send events to Kafka.

CustomerCommand events consist of a CustomerAction enum type and a Customer entity on which that action is applied. In the common/msrc/main/avro directory, you’ll find Avro schema files (with the .avsc extension) to define these objects.

{
  "type": "record",
  "namespace": "io.confluent.devrel.spring.command",
  "name": "CustomerCommand",
  "fields": [
	{
  	"name": "action",
  	"type": {
    	"type": "enum",
    	"namespace": "io.confluent.devrel.spring.command",
    	"name": "CustomerAction",
    	"symbols": [
      	"ADD", "UPDATE", "DELETE", "UNKNOWN"
    	],
    	"default": "UNKNOWN"
  	},
  	"doc": "What action to apply to the given user."
	},
	{
  	"name": "customer",
  	"type": "io.confluent.devrel.spring.model.Customer",
  	"doc": "Customer to operate on."
	}
  ]
}

The Customer is defined here:

{
  "type": "record",
  "namespace": "io.confluent.devrel.spring.model",
  "name": "Customer",
  "fields": [
	{
  	"name": "id",
  	"type": ["null","string"],
  	"default": null,
  	"doc": "identifier for Customer"
	},
	{
  	"name": "email",
  	"type": "string",
  	"doc": "email address"
	},
	{
  	"name": "firstName",
  	"type": "string",
  	"doc": "first name"
	},
	{
  	"name": "lastName",
  	"type": "string",
  	"doc": "last name"
	},
	{
  	"name": "dob",
  	"type": "string",
  	"doc": "date of birth"
	},
	{
  	"name": "mailingAddress",
  	"type": "io.confluent.devrel.spring.model.Address",
  	"doc": "mailing address"
	}
  ]
}

Above, we see a Customer has a mailingAddress of type Address, defined here:

{
  "type": "record",
  "namespace": "io.confluent.devrel.spring.model",
  "name": "Address",
  "fields": [
	{
  	"name": "address1",
  	"type": "string",
  	"doc": "street address line 1"
	},
	{
  	"name": "address2",
  	"type": ["null", "string"],
  	"default": null,
  	"doc": "street address line 2, optional"
	},
	{
  	"name": "city",
  	"type": "string"
	},
	{
  	"name": "state",
  	"type": "string"
	},
	{
  	"name": "postalCode",
  	"type": "string"
	}
  ]
}

To generate Java classes from these schemas, we use the com.bakdata.avro gradle plugin, and define where the generated sourceSet is to be located in the project directory (in this case build/generated-main-avro-java):

plugins {
	id("org.springframework.boot") version "3.3.1"
	id("io.spring.dependency-management") version "1.1.5"
	kotlin("jvm") version "1.9.24"
	kotlin("plugin.spring") version "1.9.24"
	id("com.bakdata.avro") version "1.0.0"
	kotlin("plugin.serialization") version "2.0.0"
}
...

sourceSets {
	main {
    	  kotlin.srcDirs("src/main/kotlin", "build/generated-main-avro-java")
	}
}

This plugin gives us the following tasks for source generation, specifically generateAvroJava. This task is configured to execute anytime we run the compile task in our Gradle build:

Source Generation tasks
-----------------------
...
generateAvroJava - Generates main Avro Java source files from schema/protocol definition files.
generateAvroProtocol - Generates main Avro protocol definition files from IDL files.
generateTestAvroJava - Generates test Avro Java source files from schema/protocol definition files.
generateTestAvroProtocol - Generates test Avro protocol definition files from IDL files.

Spring configuration

When configuring Spring Boot applications, we tend to default to YAML over properties files. We find the structure of YAML much easier to navigate. But, ultimately, it’s personal preference. Also, IntelliJ IDEA (and other IDEs) provide Spring support—with features like autocomplete—to make editing less prone to human error.

Confluent Cloud requires a number of configuration properties for authentication and authorization to both Kafka and Schema Registry. In this demo, we have externalized those bits to a properties file on the classpath named cc.properties. Include the host name of your broker and schema registry, along with your credentials to each.

CC_BROKER=******.aws.confluent.cloud:9092
CC_SCHEMA_REGISTRY_URL=https://******.aws.confluent.cloud
KAFKA_KEY_ID=******
KAFKA_KEY_SECRET=******
SCHEMA_REGISTRY_KEY_ID=******
SCHEMA_REGISTRY_KEY_SECRET=******

As a part of Spring Boot, Spring Cloud Config provides a method of importing external configuration as environment variables which can be used in the application.yml (or .properties) file. Remember the file we created using terraform output in a previous section? Now we will use spring.config.import to make the key-value pairs of that file environment variables, which we can use in the application.yml file, as follows:

spring:
  config:
	import: file:${user.home}/tools/spring-into-cc.properties

  kafka:
	bootstrap-servers: ${CC_BROKER}
	properties:
  	  "[sasl.mechanism]": "PLAIN"
  	  "[sasl.jaas.config]": org.apache.kafka.common.security.plain.PlainLoginModule required username='${KAFKA_KEY_ID}' password='${KAFKA_KEY_SECRET}';
  	  "[schema.registry.url]": ${CC_SCHEMA_REGISTRY_URL}
  	  "[basic.auth.credentials.source]": "USER_INFO"
  	  "[basic.auth.user.info]": "${SCHEMA_REGISTRY_KEY_ID}:${SCHEMA_REGISTRY_KEY_SECRET}"
  	  "[auto.register.schemas]": true
  	  "[client.dns.lookup]": "use_all_dns_ips"
  	  "[security.protocol]": "SASL_SSL"
	producer:
  	  client-id: "spring-kafka-cc"
  	  compression-type: "zstd"
  	  key-serializer: "org.apache.kafka.common.serialization.StringSerializer"
  	  value-serializer: "io.confluent.kafka.serializers.KafkaAvroSerializer"
  	  security:
    	  protocol: "SASL_SSL"
	consumer:
  	  group-id: "spring-kafka-cc"
  	  auto-offset-reset: earliest
  	  key-deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
  	  value-deserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer"
  	  security:
    	    protocol: "SASL_SSL"
  	  properties:
    	    "[specific.avro.reader]": true

Note that spring.config.import can be directed anywhere on the addressable file system as well as external sources such as an HTTP endpoint. For more information, check out the Spring Cloud Config Client documentation.

Producing data to Kafka

The end game here is to produce Kafka events with an Avro-serialized value to a topic. With Spring (specifically spring-kafka), we can achieve this with very little of the boilerplate code you may find in other JVM-Kafka integrations.

From the application.yml file in the previous section, you’ll see our key and value serializers for our producer are already configured—as we’ll produce events with a String key and some Avro-serialized value. The data will be compressed using the zstd codec.

A typed KafkaProducer<K, V> is configured and encapsulated within a KafkaTemplate<K, V> instance in the spring-kafka library. KafkaTemplate<K, V> relies on a Spring-managed ProducerFactory<K, V> implementation. In its most simple form, we can use the @Autowired annotation to inject a KafkaTemplate<K, V> into the class tasked with sending events to Kafka.

class CustomerCommandProducer(@Autowired private val kafkaTemplate: KafkaTemplate<String, CustomerCommand>)

Let’s have a quick aside about KafkaTemplate. The template pattern is a Gang of Four design pattern that defines the skeleton of an algorithm in a superclass. The behavior of the specific steps of the algorithm are implemented in a subclass. The template pattern can be found in many places throughout the Spring Framework—JdbcTemplate, JmsTemplate, and AmqpTemplate, to name a few. 

You may have found examples where ProducerFactory<K, V> and KafkaTemplate<K, V> are defined in a @Configuration annotated class. For the purposes of our demo, the provided DefaultProducerFactory<K, V> created from our configuration file will suffice.

Now we can create a function to send events to Kafka:

fun sendCustomerCommand(customer: Customer,customerAction: CustomerAction = CustomerAction.ADD) {
    	val command = CustomerCommand.newBuilder()
        	.setCustomer(customer)
        	.setAction(customerAction)
        	.build()

    	val record = ProducerRecord("customer-commands-avro", customer.getEmail(), command)
    	val future: CompletableFuture<SendResult<String, CustomerCommand>> = kafkaTemplate.send(record)

    	future.whenComplete { result, exception ->
        	if (exception != null) {
            	  LOG.error("ERROR sending to Kafka", exception)
        	} else {
            	  val metadata = result.recordMetadata
            	  LOG.info("Message sent to topic ${metadata.topic()}, partition: ${metadata.partition()}, offset: ${metadata.offset()}")
        	}
    	}
}

This function takes a Customer and a CustomerAction and uses those to create a CustomerCommand. The email attribute of the Customer is used as the key of the ProducerRecord<K, V>. With the ProducerRecord created, use the kafkaTemplate.send() method to produce the record to the specified Kafka topic. The send() method returns a CompletableFuture, and in some cases you may not care about the outcome of send(). For our demo, using the whenComplete function as a callback provides some insight into the result of our call to kafkaTemplate.send().

It’s really that simple to send events to a Kafka topic from a Spring application. Additional producer configuration options can be added to the application.yml as producer.propertiesbatch.size, linger.ms, retries, acks—as needed for your use case.

Consuming data streams

If you thought the producer side of Spring Kafka encapsulated a lot of the boilerplate you’re accustomed to, check out the consumer side. The configuration section detailed a consumer configuration where we expect events with a String key and an Avro value. But we want to operate on typed data, so we add the consumer configuration property specific.avro.reader with a value of true such that the underlying ConsumerFactory<K, V> will attempt to deserialize the Avro bytes to the CustomerCommand class. Here we also provide a consumer group-id value—which translates to group.id in the ConsumerConfig properties. Finally, auto-offset-reset is set to earliest, mapping to auto.offset.reset in the Kafka consumer configuration.

consumer:
  	group-id: "spring-kafka-cc"
  	auto-offset-reset: earliest
  	key-deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
  	value-deserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer"
  	security:
    	protocol: "SASL_SSL"
  	properties:
    	    "[specific.avro.reader]": true

Now we just need to annotate some function with a CustomerCommand input parameter to listen to our customer-commands-avro topic:

@KafkaListener(topics = ["customer-commands-avro"])
fun processCustomerCommand(command: CustomerCommand) {
 	runBlocking {
        	LOG.info("processing CustomerCommand -> $command")
    	}
}

This function simply logs the input event, returning a Unit type. This is where we could add additional processing of the inbound event. For instance, we could have autowired a class with functions to handle the various CustomerAction enum values and use pattern matching to call the appropriate method:

when(command.getAction()) {
	CustomerAction.ADD -> handler.addCustomer(command.getCustomer())
    	CustomerAction.UPDATE -> handler.updateCustomer(command.getCustomer())
    	CustomerAction.DELETE -> handler.deleteCustomer(command.getCustomer())
    	CustomerAction.UNKNOWN -> throw IllegalArgumentException("unknown command")
}

What’s next

Spring Boot, Apache Kafka, Confluent Cloud, and Kotlin—a beautiful friendship. We hope you found this demo to be a useful starting point with Spring Boot and Apache Kafka. There’s so much more to discuss. In future posts, we’ll look into topics such as testing (unit and integration tests) and transactions. We’ll also have a look at Kafka bindings in Spring Cloud Stream.

Be sure to follow my social channels for updates on this series. And if you think of other Spring-related topics involving Apache Kafka you’d like me to cover, drop me a message or mention.

  • Sandon Jacobs is a Developer Advocate at Confluent, based in Raleigh, NC. Sandon has two decades of experience designing and building applications, primarily with Java and Scala. His data streaming journey began while building data pipelines for real-time bidding on mobile advertising exchanges—and Apache Kafka was the platform to meet that need. Later experiences in television media and the energy sector led his teams to Kafka Streams and Kafka Connect, integrating data from various in-house and vendor sources to build canonical data models.

    Outside of work, Sandon is actively involved in his Indigenous tribal community. He serves on the NC American Indian Heritage Commission, and also as a powwow singer and emcee at many celebrations around North America. Follow Sandon on Twitter @SandonJacobs or Instagram @_sandonjacobs, where he posts about his powwow travels, family, golf, and more.

Did you like this blog post? Share it now