Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now
As a test class that allows you to test Kafka Streams logic, TopologyTestDriver is a lot faster than utilizing EmbeddedSingleNodeKafkaCluster and makes it possible to simulate different timing scenarios. Not only is the code of the Kafka Streams application very compact but the test code is also easily a much bigger code base than the actual implementation of the application. That’s why it’s important for the test code to be easily readable and understandable. It’s more maintainable in the long term as well.
When using TopologyTestDriver prior to Apache Kafka® version 2.4.0, the code needed to call ConsumerRecordFactory to produce the ConsumerRecord passed into the pipeInput() method in order to write to the input topics. When calling readOutput() to consume from the output topics, the code needed to provide correct deserializers each time. ProducerRecords returned by readOutput() contained a lot of extra fields set by Kafka internally, which made validating the records more complicated. Kafka version 2.4.0 introduces TestInputTopic and TestOutputTopic classes (via KIP-470) to simplify the usage of the test interface.
Even if a separate Fluent Kafka Streams Tests wrapper exists around TopologyTestDriver, I prefer using the same assertion framework as with all my other tests. Kafka Streams itself uses Hamcrest, but I like AssertJ which is easy to use and offers auto-completion in IDEs like IntelliJ IDEA and Eclipse. This post provides examples on how to write Kafka Streams tests with AssertJ using the new TestInputTopic and TestOutputTopic classes.
General information about testing can be found in the Kafka Streams Developer Guide.
The first step in the test class is to create a TopologyTestDriver and related TestInputTopics and TestOutputTopics, which are new since Kafka version 2.4.0. The method createInputTopic() has parameters to define the first record timestamp and the time increment of each record. If the test does not need to validate the record time, you can create TestInputTopics without those parameters.
private TopologyTestDriver testDriver; private TestInputTopic<Long, String> inputTopic; private TestOutputTopic<String, Long> outputTopic;
private final Instant recordBaseTime = Instant.parse("2019-06-01T10:00:00Z"); private final Duration advance1Min = Duration.ofMinutes(1);
@Before public void setup() { final StreamsBuilder builder = new StreamsBuilder(); //Create Actual Stream Processing pipeline MappingStreamApp.createStream(builder); testDriver = new TopologyTestDriver(builder.build(), MappingStreamApp.getStreamsConfig()); inputTopic = testDriver.createInputTopic(MappingStreamApp.INPUT_TOPIC, new LongSerializer(), new StringSerializer(), recordBaseTime, advance1Min); outputTopic = testDriver.createOutputTopic(MappingStreamApp.OUTPUT_TOPIC, new StringDeserializer(), new LongDeserializer()); }
In this example, the stream that the code is testing swaps the key as a value and the value as a key. Thus, the code pipes the key and the value, though it is also possible to write only the value if there is no key. Here, the test is asserting only the value of the output records, so it uses the readValue() method.
@Test public void testOnlyValue() { //Feed 9 as key and word "Hello" as value to inputTopic inputTopic.pipeInput(9L, "Hello"); //Read value and validate it, ignore validation of kafka key, timestamp is irrelevant in this case assertThat(outputTopic.readValue()).isEqualTo(9L); //No more output in topic assertThat(outputTopic.isEmpty()).isTrue(); }
At the end of the test, isEmpty() assures that there are no more messages in the output topic. Earlier, before 2.4.0, the test code would have read records until a record was null. Now, if the test reads from an empty topic, an exception is thrown which simplifies testing significantly. For example, the test can validate with readValue() if it is expecting to get a null value. Additionally, now the reading from a non-existing topic causes an exception instead of returning the null value like before.
@Test public void testReadFromEmptyTopic() { inputTopic.pipeInput(9L, "Hello"); assertThat(outputTopic.readValue()).isEqualTo(9L); //Reading from empty topic generate Exception assertThatExceptionOfType(NoSuchElementException.class).isThrownBy(() -> { outputTopic.readValue(); }).withMessage("Empty topic: %s", MappingStreamApp.OUTPUT_TOPIC); }
To validate both the key and the value, the test can use readKeyValue() and assert it against a KeyValue object.
@Test public void testKeyValue() { //Feed 9 as key and word "Hello" as value to inputTopic inputTopic.pipeInput(9L, "Hello"); //Read KeyValue and validate it, timestamp is irrelevant in this case assertThat(outputTopic.readKeyValue()).isEqualTo(new KeyValue<>("Hello", 9L)); //No more output in topic assertThat(outputTopic.isEmpty()).isTrue(); }
If the test also needs to validate the record timestamp, it can use readRecord() and assert it against a TestRecord. The TestRecord constructor support both Instant timestamps as well as plain long timestamps.
@Test public void testKeyValueTimestamp() { final Instant recordTime = Instant.parse("2019-06-01T10:00:00Z"); //Feed 9 as key and word "Hello" as value to inputTopic with record timestamp inputTopic.pipeInput(9L, "Hello", recordTime); //Read TestRecord and validate it assertThat(outputTopic.readRecord()).isEqualTo(new TestRecord<>("Hello", 9L, recordTime)); //No more output in topic assertThat(outputTopic.isEmpty()).isTrue(); }
If the test needs to validate the record header but does not care about timestamps, isEqualToIgnoringNullFields() from AssertJ is useful. This way, the actual record timestamp can be ignored. You can also implement a partial test with Hamcrest using allOf() and hasProperty() matchers.
@Test public void testHeadersIgnoringTimestamp() { final Headers headers = new RecordHeaders( new Header[]{ new RecordHeader("foo", "value".getBytes()) }); //Feed 9 as key, word "Hello" as value, and header to inputTopic with record timestamp filled by processing inputTopic.pipeInput(new TestRecord<>(9L, "Hello", headers)); //Using isEqualToIgnoringNullFields to ignore validating record time assertThat(outputTopic.readRecord()).isEqualToIgnoringNullFields(new TestRecord<>("Hello", 9L, headers)); assertThat(outputTopic.isEmpty()).isTrue(); }
Similarly to testing a single record, it is possible to pipe Value lists into a TestInputTopic. Validating the output can be done record by record like before, or by using the readValueToList() method to see the big picture when validating the whole collection at the same time. For our example topology, when the test pipes in the values, it needs to validate the keys and use the readKeyValueToList() method.
@Test public void testKeyValueList() { final List inputList = Arrays.asList("This", "is", "an", "example"); final List<KeyValue<String, Long>> expected = new LinkedList<>(); for (final String s : inputList) { //Expected list contains original values as keys expected.add(new KeyValue<>(s, null)); } //Pipe in value list inputTopic.pipeValueList(inputList); assertThat(outputTopic.readKeyValuesToList()).hasSameElementsAs(expected); }
This next test is writing a KeyValue list in which each TestRecord has a new time value based on the initial time and the auto-advance parameters set when the TestInputTopic was created.
@Test public void testRecordList() { final List inputList = Arrays.asList("This", "is", "an", "example"); final List<KeyValue<Long, String>> input = new LinkedList<>(); final List<TestRecord<String, Long>> expected = new LinkedList<>(); long i = 1; Instant recordTime = recordBaseTime; for (final String s : inputList) { input.add(new KeyValue<>(i, s)); //Expected entries have key and value swapped and recordTime advancing 1 minute in each expected.add(new TestRecord<>(s, i++, recordTime)); recordTime = recordTime.plus(advance1Min); i++; } //Pipe in KeyValue list inputTopic.pipeKeyValueList(input); assertThat(outputTopic.readRecordsToList()).hasSameElementsAs(expected); }
If the test is validating a stream where only the latest record of the same key is valid, like an aggregating stream, it can use readKeyValuesToMap() and validate it. The difference with the returned map to a KTable is that after tombstones are sent (records with a null value), the map still contains those keys with a null value, but it does not remove the whole key. This is by design, so as not to lose the information that a key existed and was deleted later.
@Test public void testValueMap() { final List inputList = Arrays.asList("a", "b", "c", "a", "b"); final List<KeyValue<Long, String>> input = new LinkedList<>(); long i = 1; for (final String s : inputList) { input.add(new KeyValue<>(i++, s)); } //Pipe in KeyValue list inputTopic.pipeKeyValueList(input); //map contain the last index of each entry assertThat(outputTopic.readKeyValuesToMap()).hasSize(3) .containsEntry("a", 4L) .containsEntry("b", 5L) .containsEntry("c", 3L); }
The full example code for this Kafka Streams test tutorial can be found on GitHub.
You can migrate the existing TopologyTestDriver test with a simple find-and-replace approach that allows you to modify most items. The time handling is modified to use the Instant and Duration classes, but you can also use a long timestamp with TestRecord. Many examples of modified tests are available in commit in KIP-470.
To use the new test input and output topic classes with older Kafka versions, there is a separate package that can be used by modifying only the package import. The package kafka-streams-test-topics can be found on GitHub and Apache Maven repositories.
Other testing examples can be found here:
With the ability to test Kafka Streams logic using the TopologyTestDriver and without utilizing actual producers and consumers, testing becomes much quicker, and it becomes possible to simulate different timing scenarios. TestInputTopic and TestOutputTopic classes simplify testing and enable you to use the assertion framework of your choice. Happy testing.
If you’d like to know more, you can read Getting Your Feet Wet with Stream Processing – Part 2: Testing Your Streaming Application and download the Confluent Platform to get started with a complete event streaming platform, built by the original creators of Apache Kafka.
Tableflow can seamlessly make your Kafka operational data available to your AWS analytics ecosystem with minimal effort, leveraging the capabilities of Confluent Tableflow and Amazon SageMaker Lakehouse.
Building a headless data architecture requires us to identify the work we’re already doing deep inside our data analytics plane, and shift it to the left. Learn the specifics in this blog.