Rise of the Kafka Heroes! Join the Data Streaming Revolution | Read the Comic
As a test class that allows you to test Kafka Streams logic, TopologyTestDriver
is a lot faster than utilizing EmbeddedSingleNodeKafkaCluster
and makes it possible to simulate different timing scenarios. Not only is the code of the Kafka Streams application very compact but the test code is also easily a much bigger code base than the actual implementation of the application. That’s why it’s important for the test code to be easily readable and understandable. It’s more maintainable in the long term as well.
When using TopologyTestDriver
prior to Apache Kafka® version 2.4.0, the code needed to call ConsumerRecordFactory
to produce the ConsumerRecord
passed into the pipeInput()
method in order to write to the input topics. When calling readOutput()
to consume from the output topics, the code needed to provide correct deserializers each time. ProducerRecords
returned by readOutput()
contained a lot of extra fields set by Kafka internally, which made validating the records more complicated. Kafka version 2.4.0 introduces TestInputTopic
and TestOutputTopic
classes (via KIP-470) to simplify the usage of the test interface.
Even if a separate Fluent Kafka Streams Tests wrapper exists around TopologyTestDriver
, I prefer using the same assertion framework as with all my other tests. Kafka Streams itself uses Hamcrest, but I like AssertJ which is easy to use and offers auto-completion in IDEs like IntelliJ IDEA and Eclipse. This post provides examples on how to write Kafka Streams tests with AssertJ using the new TestInputTopic
and TestOutputTopic
classes.
General information about testing can be found in the Kafka Streams Developer Guide.
The first step in the test class is to create a TopologyTestDriver
and related TestInputTopics
and TestOutputTopics
, which are new since Kafka version 2.4.0. The method createInputTopic()
has parameters to define the first record timestamp and the time increment of each record. If the test does not need to validate the record time, you can create TestInputTopics
without those parameters.
private TopologyTestDriver testDriver; private TestInputTopic<Long, String> inputTopic; private TestOutputTopic<String, Long> outputTopic;
private final Instant recordBaseTime = Instant.parse("2019-06-01T10:00:00Z"); private final Duration advance1Min = Duration.ofMinutes(1);
@Before public void setup() { final StreamsBuilder builder = new StreamsBuilder(); //Create Actual Stream Processing pipeline MappingStreamApp.createStream(builder); testDriver = new TopologyTestDriver(builder.build(), MappingStreamApp.getStreamsConfig()); inputTopic = testDriver.createInputTopic(MappingStreamApp.INPUT_TOPIC, new LongSerializer(), new StringSerializer(), recordBaseTime, advance1Min); outputTopic = testDriver.createOutputTopic(MappingStreamApp.OUTPUT_TOPIC, new StringDeserializer(), new LongDeserializer()); }
In this example, the stream that the code is testing swaps the key as a value and the value as a key. Thus, the code pipes the key and the value, though it is also possible to write only the value if there is no key. Here, the test is asserting only the value of the output records, so it uses the readValue()
method.
@Test public void testOnlyValue() { //Feed 9 as key and word "Hello" as value to inputTopic inputTopic.pipeInput(9L, "Hello"); //Read value and validate it, ignore validation of kafka key, timestamp is irrelevant in this case assertThat(outputTopic.readValue()).isEqualTo(9L); //No more output in topic assertThat(outputTopic.isEmpty()).isTrue(); }
At the end of the test, isEmpty()
assures that there are no more messages in the output topic. Earlier, before 2.4.0, the test code would have read records until a record was null
. Now, if the test reads from an empty topic, an exception is thrown which simplifies testing significantly. For example, the test can validate with readValue()
if it is expecting to get a null
value. Additionally, now the reading from a non-existing topic causes an exception instead of returning the null
value like before.
@Test public void testReadFromEmptyTopic() { inputTopic.pipeInput(9L, "Hello"); assertThat(outputTopic.readValue()).isEqualTo(9L); //Reading from empty topic generate Exception assertThatExceptionOfType(NoSuchElementException.class).isThrownBy(() -> { outputTopic.readValue(); }).withMessage("Empty topic: %s", MappingStreamApp.OUTPUT_TOPIC); }
KeyValue
outputTo validate both the key and the value, the test can use readKeyValue()
and assert it against a KeyValue
object.
@Test public void testKeyValue() { //Feed 9 as key and word "Hello" as value to inputTopic inputTopic.pipeInput(9L, "Hello"); //Read KeyValue and validate it, timestamp is irrelevant in this case assertThat(outputTopic.readKeyValue()).isEqualTo(new KeyValue<>("Hello", 9L)); //No more output in topic assertThat(outputTopic.isEmpty()).isTrue(); }
TestRecord
If the test also needs to validate the record timestamp, it can use readRecord()
and assert it against a TestRecord
. The TestRecord
constructor support both Instant
timestamps as well as plain long timestamps.
@Test public void testKeyValueTimestamp() { final Instant recordTime = Instant.parse("2019-06-01T10:00:00Z"); //Feed 9 as key and word "Hello" as value to inputTopic with record timestamp inputTopic.pipeInput(9L, "Hello", recordTime); //Read TestRecord and validate it assertThat(outputTopic.readRecord()).isEqualTo(new TestRecord<>("Hello", 9L, recordTime)); //No more output in topic assertThat(outputTopic.isEmpty()).isTrue(); }
TestRecord
and ignoring the timestampIf the test needs to validate the record header but does not care about timestamps, isEqualToIgnoringNullFields()
from AssertJ is useful. This way, the actual record timestamp can be ignored. You can also implement a partial test with Hamcrest using allOf()
and hasProperty()
matchers.
@Test public void testHeadersIgnoringTimestamp() { final Headers headers = new RecordHeaders( new Header[]{ new RecordHeader("foo", "value".getBytes()) }); //Feed 9 as key, word "Hello" as value, and header to inputTopic with record timestamp filled by processing inputTopic.pipeInput(new TestRecord<>(9L, "Hello", headers)); //Using isEqualToIgnoringNullFields to ignore validating record time assertThat(outputTopic.readRecord()).isEqualToIgnoringNullFields(new TestRecord<>("Hello", 9L, headers)); assertThat(outputTopic.isEmpty()).isTrue(); }
Value
and KeyValue
listsSimilarly to testing a single record, it is possible to pipe Value
lists into a TestInputTopic
. Validating the output can be done record by record like before, or by using the readValueToList()
method to see the big picture when validating the whole collection at the same time. For our example topology, when the test pipes in the values, it needs to validate the keys and use the readKeyValueToList()
method.
@Test public void testKeyValueList() { final List inputList = Arrays.asList("This", "is", "an", "example"); final List<KeyValue<String, Long>> expected = new LinkedList<>(); for (final String s : inputList) { //Expected list contains original values as keys expected.add(new KeyValue<>(s, null)); } //Pipe in value list inputTopic.pipeValueList(inputList); assertThat(outputTopic.readKeyValuesToList()).hasSameElementsAs(expected); }
Value
lists and with auto-advanced record timeThis next test is writing a KeyValue
list in which each TestRecord
has a new time value based on the initial time and the auto-advance parameters set when the TestInputTopic
was created.
@Test public void testRecordList() { final List inputList = Arrays.asList("This", "is", "an", "example"); final List<KeyValue<Long, String>> input = new LinkedList<>(); final List<TestRecord<String, Long>> expected = new LinkedList<>(); long i = 1; Instant recordTime = recordBaseTime; for (final String s : inputList) { input.add(new KeyValue<>(i, s)); //Expected entries have key and value swapped and recordTime advancing 1 minute in each expected.add(new TestRecord<>(s, i++, recordTime)); recordTime = recordTime.plus(advance1Min); i++; } //Pipe in KeyValue list inputTopic.pipeKeyValueList(input); assertThat(outputTopic.readRecordsToList()).hasSameElementsAs(expected); }
KeyValues
to Map
If the test is validating a stream where only the latest record of the same key is valid, like an aggregating stream, it can use readKeyValuesToMap()
and validate it. The difference with the returned map to a KTable is that after tombstones are sent (records with a null
value), the map still contains those keys with a null
value, but it does not remove the whole key. This is by design, so as not to lose the information that a key existed and was deleted later.
@Test public void testValueMap() { final List inputList = Arrays.asList("a", "b", "c", "a", "b"); final List<KeyValue<Long, String>> input = new LinkedList<>(); long i = 1; for (final String s : inputList) { input.add(new KeyValue<>(i++, s)); } //Pipe in KeyValue list inputTopic.pipeKeyValueList(input); //map contain the last index of each entry assertThat(outputTopic.readKeyValuesToMap()).hasSize(3) .containsEntry("a", 4L) .containsEntry("b", 5L) .containsEntry("c", 3L); }
The full example code for this Kafka Streams test tutorial can be found on GitHub.
You can migrate the existing TopologyTestDriver
test with a simple find-and-replace approach that allows you to modify most items. The time handling is modified to use the Instant
and Duration
classes, but you can also use a long timestamp with TestRecord
. Many examples of modified tests are available in commit in KIP-470.
To use the new test input and output topic classes with older Kafka versions, there is a separate package that can be used by modifying only the package import. The package kafka-streams-test-topics
can be found on GitHub and Apache Maven repositories.
Other testing examples can be found here:
With the ability to test Kafka Streams logic using the TopologyTestDriver
and without utilizing actual producers and consumers, testing becomes much quicker, and it becomes possible to simulate different timing scenarios. TestInputTopic
and TestOutputTopic
classes simplify testing and enable you to use the assertion framework of your choice. Happy testing.
If you’d like to know more, you can read Getting Your Feet Wet with Stream Processing – Part 2: Testing Your Streaming Application and download the Confluent Platform to get started with a complete event streaming platform, built by the original creators of Apache Kafka.
Versioned key-value state stores, introduced to Kafka Streams in 3.5, enhance stateful processing capabilities by allowing users to store multiple record versions per key, rather than only the single latest version per key as is the case for existing key-value stores today...
This blog post discusses the two generals problems, how it impacts message delivery guarantees, and how those guarantees would affect a futuristic technology such as teleportation.