Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now
After a short break, we’re back with Part 2 of this series on Spring Framework, Confluent Cloud, and the Kotlin language. Many organizations that write applications and microservices for the JVM have chosen Spring Framework, leveraging the many libraries available for features such as REST services, persisting data to a variety of datastores, and integration with messaging. These organizations have existing investments in building, testing, deploying, and monitoring applications using Spring.
Spring also facilitates many common design patterns and practices—think interface-driven design and the Gang of Four patterns like singleton, factory, decorator, and template method. Speaking of the template pattern, that’s where Spring’s integration with Apache Kafka® comes into play with the use of KafkaTemplate
, which we covered in the previous entry of this series.
This brings us to Kotlin, which has proven to be a great fit in server-side development on the JVM. Spring Framework added support for Kotlin in the 5.0 release, as Kotlin extensions paved the way for easily building APIs onto existing Spring features. Layering this with other Kotlin features like a more concise syntax, null safety, and Java interoperability, we see why the language has built quite the following. The Kotlin tour is a good resource for explaining these features and more.
In this edition, we’ll have a look at creating Kafka Streams topologies—exploring the dependency injection and design principles with Spring Framework, while also highlighting some syntactic sugar of Kotlin that (in our humble opinion) make for more concise and legible topologies. The code examples in this post can be found in our demo-scene GitHub repository. Let’s get into it.
To get started we’ll use the humble, yet mighty, “word count” example. This example defines a topology to tokenize inbound string events and stores the counts of unique words to a state store, while also emitting those counts to a Kafka topic.
This @Component
annotated class features a companion object
(1) that is useful to define constants related to this topology. In this case, we define constants for the input and output topics, along with the name of a state store used to materialize our results. The Serdes
(2) (3) used here are defined in the class —String
and Long
are the types used in the inbound and outbound events.
The buildPipeline()
function (4) is @Autowired
to use the configured instance of the StreamsBuilder
in building the topology. The rest is a typical Kafka Streams topology, but let’s have a look into the syntactic differences from its Java counterpart.
The input stream is defined from the input topic wc-input-topic
, deserializing the key and value using the Serdes.String()
. This should look familiar to what you would typically see in Java. Here we start to see some differences, using curly braces to define these lambda expressions in Kotlin as opposed to parentheses. For insights, we use the peek
function to log the raw event value at DEBUG
level.
Because we want a case-insensitive count, we use mapValues()
(5) to lowercase the incoming event string. Next, let’s tokenize that input stream by using a regex to split the stream—again only the value matters to us here, thus the use of flatMapValues()
(6). The groupBy
method uses a provided KeyValueMapper
to select a new key and preserve the original values. In this case, we rekey this KStream
to a KGroupedStream
using the word itself as the new key. The KeyValueMapper
is defined here (7) as a lambda expression.
The results of the count()
method (8) are then materialized to a state store, using the constant defined in our companion object
with the Serde
instances for the key and value data types. Later, we’ll use this state store in a REST service. Finally, we stream (9) these count results to the OUTPUT_TOPIC
.
So we’re persisting the results of these word counts to a state store. How can we use that? Let’s say we have a use case where we’d like the ability to query the counts for a specific word. Well, Spring Boot provides a simple way to create REST controllers and endpoints. Let’s create a simple example to query this store. This endpoint will accept a word as a path variable and query the store for the count of that value.
For testing purposes, we have a POST endpoint (1), allowing us to send string/phrases to the INPUT_TOPIC
(remember our companion object
on the Processor class) via an autowired KafkaTemplate
(2).
To query the state store, we use the defined GET endpoint (3) to query the counts of a specific word. Don’t let the question marks cloud things here. This is Kotlin’s way of handling nullability. Let’s dissect this function:
From the autowired StreamsBuilderFactoryBean
—which gives the controller class access and controls to the underlying Kafka Streams in this application—we make a call to get the kafkaStreams
(4). This could be null, if we weren’t actually running a streams application. Thus the ?
to check before continuing.
With this streams
instance in scope, we look up the ReadOnlyKeyValueStore
by the same name used in our Kafka Streams topology. Again, this could be null. So we use the ?
to account for that. (5)
If we are able to get the store, we then search for the word—which is the key of this store. (5)
The result of the lookup is nullable, therefore we either return the value at that key or a 0L to denote this word was not found. (6)
Given a running topology, let’s write a unit test. Kafka Streams provides utilities that facilitate writing good unit tests.
This test instantiates a StreamsBuilder
(1) to use in configuring the topology. We then instantiate the WordCountProcessor
and call the buildPipeline()
function (2) to wire our topology to the streamsBuilder
. Next, calling build()
on the streamsBuilder
(3) will start the topology.
Kafka Streams provides the TopologyTestDriver
to verify the behavior of the topology without an actual Kafka broker. The use
function (4) here is an example of how Kotlin manages the “try-with-resources” construct of the JVM (see the docs), as TopologyTestDriver
implements the Closable
interface—once out of scope this driver instance will close and release resources. Using the TopologyTestDriver
we can create input (5) and output (6) topics. When created, these create*Topic
methods need the serializer/deserializer implementations for the key and value of the events used.
Now we can send events to the inputTopic
(7), making sure we have some repeating words such that we can verify the aggregations. Calling outputTopic.readKeyValuesToList()
pipes the output events (if they exist) to a List<KeyValue>
. Using the containsAll
method on assertThat
(8), we can verify the exact contents we expect from the topology.
To execute this test from the command line:
OK, we counted words and unit tested it. Let’s look at a more “real-world” example, where we join streams of typed data. In this use case, we have a fictional health club. Over time, members of the club check in for a workout. We’re going to build a Kafka Streams topology to track the check-in events for our highest-level members—the folks who spend the most money on membership.
Here are some data structures, defined in Avro schemas for this use case.
There are various levels of membership, defined in an enumeration. These are the only valid values for the membership levels.
A Member
has certain identifying information—like id, name, email. We also know the date the member joined the club. By default, the membership level is STANDARD.
Periodically, the Member
will Checkin
at the club. A Checkin
event had a transaction id, a timestamp, and the member ID:
The end result of a check-in event matching a known member in the highest membership levels is an EnrichedCheckin
—preserving the data about the check-in event and augmenting it with the membership level.
For starters, this topology has the Serde
instances for the input streams and the output stream injected via constructor autowiring (1). (We’ll see the specifics of this in the @Configuration
annotated class in a bit.) Again, we use a companion object
(2) to provide our constants—the names of the topics used in the topology.
Let’s walk through the buildPipeline()
function.
Read the CHECKIN_TOPIC
into a KStream
using the Serdes.String()
for the key and the provided checkinSerde
implementation for the value (3).
Read the MEMBER_TOPIC
into a KTable
using the Serdes.String()
for the key and the provided memberSerde
implementation for the value (4). We use the filter
method (5) with a lambda expression to only include the members of PLATINUM or GOLD level.
Because each of the input streams is keyed by the member ID, the stream-table join (5) is simple. The ValueJoiner
is provided as a lambda expression—mapping the matching checkin
and member
to a new EnrichedCheckin
(7), which is our desired output type.
Finally, we send the EnrichedCheckin
to the output topic (8).
There are some bits of this topology which need to be configured. Given we’re using Confluent Cloud at runtime, we need to safely provide and inject our connection information and credentials to the topology. Let’s start with an @Configuration
annotated class.
With these elements injected with the @Value
annotation, we can create our streams configuration:
We can also use those injected constructor parameters to configure the Serde
instances needed by our MemberCheckinProcessor
class:
The application.yml configuration should be familiar—we use spring.config.import to inject protected information for connecting to Confluent Cloud:
Using the Terraform steps from our previous blog, the topics in this example are created. You can then execute this topology (either with gradle or in your IDE).
As we write these unit tests, you should get a better understanding of the decision to inject the Serde
instances as parameters to the processor class.
We know what you’re thinking: “Wait, wait, wait…So much for immutability.” Well, for starters this is test code—so taking liberties on the “purist” elements of a language is perfectly fine. Second, Kotlin provides the keyword lateinit
to allow for “late binding” of certain values. The var
instances you see here are all to be reinstantiated on each test execution, as they are created from the TopologyTestDriver
which is closed after each test execution. Below is the setup
function that runs before each test execution to instantiate these objects:
Note that we opted to use the SpecificAvroSerde
implementation with the mock://schema-registry URL. We could easily use a different Serde
implementation here, such as JSON or even a mock with a library like Mockito. (However, we find the extreme usage of mock libraries to often point to code/design “smell.” These libraries have their place, but it’s simple enough in this case to use this implementation.)
Just as in the word count unit test, we instantiate our input and output topics from the TopologyTestDriver
.
Now we are ready to execute our test cases. Let’s think about the use cases to test (this isn’t comprehensive):
A known PLATINUM member checks in.
A known GOLD member checks in.
An unknown PLATINUM member checks in.
An unknown GOLD member checks in.
A known STANDARD member checks in.
A known SILVER member checks in.
JUnit 5 provides the @ParameterizedTest
annotation, which we can use in these cases to promote code reuse.
For the “known member” cases, we want to create a Member
and a Checkin
where the member ID values match. We expect our topology to return an EnrichedCheckin
in the cases where this Member
is of PLATINUM and GOLD levels. This test will execute for each of those cases, given the @EnumSource
values for those levels. We then assert the output topic has 1 event whose attributes match what we know should be mapped in the topology:
The “unknown member” cases are similar, except we do not want the member ID values to match. In that case, we expect nothing in the output topic.
The STANDARD and SILVER cases should also return no results on the output topic:
The generated HTML test report shows us how these tests were executed. You can see each permutation of the test case with the parameter used in the “Test” column of the report.
We hope you can see the benefits of using Spring with Kafka Streams in these examples. The Kotlin language makes for a more concise and declarative way to reason about these topologies. And we even have unit tests.
We’d like to continue this journey with Spring, Kafka, and Kotlin, so look for more posts soon. Better yet, ping us on our social media channels with your ideas and questions.
Been searching far and wide for examples of Spring Boot with Kotlin integrated with Apache Kafka®? You’ve found it. But not just an example with unstructured data or no schema management. Not here! We’re going all the way with Stream Governance in Confluent Cloud. Let’s get into it.
With both Confluent and Amazon Redshift supporting mTLS, streaming developers and architects are able to take advantage of a native integration that allows Amazon Redshift to query Confluent Cloud topics.