[Webinar] Don’t Get Left Behind: Unlock the Secrets of Shifting Left | Register Now
Designing microservices using an event-driven approach has several benefits, including improved scalability, easier maintenance, clear separation of concerns, system resilience, and cost savings. With Apache Kafka® as an event plane, services now have a durable, scalable, and reliable source of event data. From Kafka topics, a microservice can easily rebuild and restore the state of the data used to serve end users.
Microservice architects searching for a JVM framework for developers may want to explore Micronaut. This framework embraces event-driven architecture. This article briefly introduces you to Micronaut and its benefits. Then we’ll dive into the details of integrating your microservices with Apache Kafka on Confluent Cloud. Let’s get into it.
Micronaut is an open source JVM-based framework for building lightweight microservices. At its core, it is designed to avoid reflection to improve application startup times, recomputing injected dependencies at compile time rather than runtime. Per the documentation, Micronaut supports best-practice patterns for building JVM applications, such as dependency injection and inversion of control, aspect-oriented programming, sensible defaults, and auto-configuration.
We'll cover two use cases to illustrate Micronaut’s integration with Kafka. Both use cases apply a “listen to yourself” pattern, with a REST controller sending commands to alter data to Kafka. From there, a listener processes the Kafka events and updates the underlying data model. Query requests call a data source directly via JPA.
There are two examples because we want to highlight data serialization in Micronaut. We’ll begin by letting Micronaut use its sensible defaults to infer how the key and value should be serialized. Then, we’ll pivot to using the Stream Governance capabilities of Confluent Cloud to manage Apache Avro™ schemas for our structured data.
With the Micronaut framework, we can also build message and event-driven applications. And, yes, that includes Apache Kafka—as well as RabbitMQ®, JMS, and MQTT. But we’re here for Kafka, so let’s get into that. For starters, add the micronaut-kafka dependency to the application build. Here it is in Gradle:
Next, we should explore how to configure a Micronaut application to use Apache Kafka. We prefer using YAML, TOML, or Hocon—instead of a properties file—for the sake of legibility. To use YAML, you need to add snakeyaml as a runtime dependency to your build:
Now that we have that in place, we can start to configure our connection to Apache Kafka—in our case on Confluent Cloud. One of the key features about Micronaut is the use of sensible defaults in application configuration. The examples will inject those values from environment variables to protect our Confluent Cloud connection parameters and credentials. Here’s the basis of our Confluent Cloud connection:
Our first use case will adhere to the Micronaut defaults as closely as possible. In this scenario, a REST controller sends events about product price changes to a Kafka topic. A listener to that topic updates the underlying data model, and subsequent queries use the controller's GET endpoints to retrieve the Product entities from the data store.
A primary concern in event streaming is data serialization. This is vital to any distributed data contract—event data needs a known structure, full stop. And there are multiple ways to achieve this. Given the nature of the listen-to-yourself pattern, the events published are intended to be consumed by our application. This won’t always be the case; we’ll elaborate on this later. For now, let’s use this pattern to highlight the default serialization methodology used by Micronaut.
When serializing data to Kafka, Micronaut takes an “educated attempt” at inferring the Serializer implementation to use given the data type of the key and value. Our initial pass at producing data for the product-price-changes topic is to serialize the key as a String, and the value is a ProductPriceChangedEvent record. Looking at the ProductPriceChangedEvent record, we see the @Serdeable annotation:
Applying the @Serdeable annotation indicates that this ProductPriceChangedEvent class is to be serialized and deserialized as JSON. Using the DefaultSerdeRegistry class makes these selections at compile time via introspection. This directly contrasts with frameworks like Spring, which make these decisions at runtime, a much less efficient process.
Sending events to Kafka starts by decorating an interface with the @KafkaClient annotation.
The ProductPriceChangeClient interface is annotated as a @KafkaClient (1). This annotation provides AOP advice on how to configure a KafkaProducer instance. This example does not provide a value property and will use the default producer configuration. More on that in a bit.
To send events to a Kafka topic, we annotate a method of the interface with @Topic (2), providing the name of the topic to which events are sent. The parameters of this send() method (3) are the elements of the event to send to Kafka. If this method had only one parameter, it would be implied as the value of the resulting ProducerRecord. In this case, we have two parameters, one of them annotated with @KafkaKey (you guessed it, this will be the key of the ProducerRecord with the second parameter as the value). The @MessageHeader annotation could add parameters to this method, adding headers to the resulting ProducerRecord.
To consume these ProductPriceChangeEvent records from Kafka, let’s decorate a class (not an interface) as a @KafkaListener.
The @KafkaListener (1) encapsulates the configuration and creation of a KafkaConsumer. Like @KafkaClient, the absence of a value parameter means we’ll use the default consumer configuration here. However, we can include other Kafka ConsumerConfig values here, such as the auto.offset.reset, and group.id values exposed through the annotation.
The handle() method is annotated with the @Topic annotation (2), specifying the Kafka topic from which it consumes events. Here, the handle() method has one parameter (3), which is implied to be the value of the underlying ConsumerRecord. This simple example calls a method on the ProductRepository to update the price of a Product.
Let’s revisit the configuration terms we breezed through in the previous section—specifically the concept of default configuration for producers and consumers.
Starting with the producer configuration, we can provide a default configuration for any @KafkaClient-annotated interfaces to use when those classes do not specify a producer configuration. This is useful when all producers in your application use the same serializers (both key and value). But it will also be the fallback for any other producer configurations for which you may specify values, overriding the defaults. The example below configures the default producer to use StringSerializer for the key and ByteArraySerializer for the value of produced events.
The same premise applies to configuring classes annotated with @KafkaListener to configure the defaults for Kafka consumers. You could define an application-wide default for group.id and other KafkaConsumer configuration values here.
In the listen-to-yourself pattern for microservices, the serialization strategy detailed in our first example may suffice. Perhaps the product-price-changes topic is “self-contained,” meaning there are access controls that restrict write and read access to this data in our microservice. Perhaps this isn’t part of our canonical data model.
When we work with data from the canonical model—data that has meaning across the organization—we need to ensure data quality and integrity. There is no better place to do this than straight from the source, as far “left” as possible in the lifecycle of our streams. This is where stream governance comes into play, specifically building, maintaining, and evolving the schema of the data in our canonical model. We would like the entire organization to sing from the same sheet of music.
Confluent Cloud provides stream governance centered around the Confluent Schema Registry, which we can use to manage and evolve data contracts. Confluent also provides implementations of Kafka’s Serializer and Deserializer interfaces for Kafka clients that are “Schema Registry-aware.” These include industry-standard serialization libraries like Avro, Google Protobuf, and JSON with schema.
Let’s create Avro schemas for the concept of an order, which will be updated via the controller class, sending an order change event to a Kafka topic. When processed, this event updates the items in the order. Here are the schema definitions:
Notice there are two record types in this schema: an enum called OrderChangeEventType (defined inline) and a record OrderItem. Let’s define the OrderItem schema:
A common practice in the Java community with Avro is to generate Java bean classes, providing compile time safety and easier code to read. Since we’re using Gradle, we include the com.bakdata.avro Gradle plugin in our build:
This gives us Gradle tasks to generate the Java bean classes by running this command:
Now we have Java beans that implement the SpecificRecordBase interface of the Apache Avro Java SDK.
Micronaut allows multiple producer definitions in the configuration file. Given that these order changes are serialized differently than in the previous use case, the producer must be configured with the appropriate serializers. Here is the additional producer configuration:
Here the order-changes producer overrides the default value of the key and value serializers—LongSerializer and KafkaAvroSerializer, respectively. Since KafkaAvroSerializer is Schema Registry-aware, we provide the configuration needed to connect to Confluent Schema Registry.
We apply this configuration to the interface annotated as the @KafkaClient for our order changes as follows:
Similarly, we define the consumer configuration to correspond to the producer configuration for order changes:
With this configuration in place, let’s update the @KafkaListener-annotated classes to use them:
As you can see, the annotations provide easy overrides from the configuration. For instance, if we didn’t specify a group.id in the @KafkaListener annotation, the fallback is to the value from the application.yaml file.
Theoretically, this is all great. But let’s see it in action, with events and a data store. Here are some prerequisites to running the application:
Java 21 (we use SDKMAN! to manage multiple JDK installations)
Editor/IDE of choice
You can start by cloning the Confluent demo-scene repository from GitHub. The examples in this article are in the micronaut-cc directory.
As helpful as the Confluent Cloud console may be, Terraform is used here to provision Confluent Cloud environments, Kafka clusters, Stream Governance, and access controls.
With the repository cloned, open a terminal and go to that location and the micronaut-cc directory. There you’ll find a terraform subdirectory. (If this is your first foray into Confluent Cloud CLI, pay particular attention to the environment variables steps in the README file.) Let’s go there and execute the following commands:
The first command is vital to the process because we are authenticating to Confluent Cloud and using the CLI (1). From there, we need to preserve our Confluent Cloud organization id value for later Terraform steps to use. Looking at variables.tf, there is an org_id variable that needs to be defined. Terraform allows for adding these values as environment variables, prefixed with TF_VAR_
. The confluent organization list command returns the organization(s) of which the authenticated user is a member as a JSON document. That output is piped to a series of jq commands to find the id of the current organization. (Note: If your user is a member of MULTIPLE organizations, this might not be deterministic. But for our demo purposes, this works.)
As the name implies, terraform init (2) initializes the Terraform environment by pulling in the needed providers used by our Terraform code. Next, we create a “plan” for Terraform to execute (3), comparing our Terraform code to the known state of the environment to determine the changes to be made. Finally, the “plan” is applied (4) to the environment.
After the Confluent Cloud environment changes are applied, we can use the terraform output command to extract the environment and credential information to a properties file. This protects us from accidentally committing these sensitive values to our Git repository.
We can then inject these key-value pairs as environment variables used by our application at runtime.
This microservice uses MySQL as a data store—just to illustrate the processing of our events by consumer classes. With that in mind, you need to start a MySQL instance using Docker—here’s a command to get you there:
Once the database is started, create a schema named micronaut for use in our microservice. This can be done via the command line by opening a bash shell to the running container:
Going back to the application.yml file, we configure Micronaut to use this database:
Also, we want to leverage Micronaut's FlywayDB capabilities to read our JPA entity classes and create the appropriate databas