Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Testing Kafka Streams Using TestInputTopic and TestOutputTopic

Written By

As a test class that allows you to test Kafka Streams logic, TopologyTestDriver is a lot faster than utilizing EmbeddedSingleNodeKafkaCluster and makes it possible to simulate different timing scenarios. Not only is the code of the Kafka Streams application very compact but the test code is also easily a much bigger code base than the actual implementation of the application. That’s why it’s important for the test code to be easily readable and understandable. It’s more maintainable in the long term as well.

When using TopologyTestDriver prior to Apache Kafka® version 2.4.0, the code needed to call ConsumerRecordFactory to produce the ConsumerRecord passed into the pipeInput() method in order to write to the input topics. When calling readOutput() to consume from the output topics, the code needed to provide correct deserializers each time. ProducerRecords returned by readOutput() contained a lot of extra fields set by Kafka internally, which made validating the records more complicated. Kafka version 2.4.0 introduces TestInputTopic and TestOutputTopic classes (via KIP-470) to simplify the usage of the test interface.

KIP-470: TestInputTopic | TestOutputTopic

Even if a separate Fluent Kafka Streams Tests wrapper exists around TopologyTestDriver, I prefer using the same assertion framework as with all my other tests. Kafka Streams itself uses Hamcrest, but I like AssertJ which is easy to use and offers auto-completion in IDEs like IntelliJ IDEA and Eclipse. This post provides examples on how to write Kafka Streams tests with AssertJ using the new TestInputTopic and TestOutputTopic classes.

Setting up the test fixture

General information about testing can be found in the Kafka Streams Developer Guide.

The first step in the test class is to create a TopologyTestDriver and related TestInputTopics and TestOutputTopics, which are new since Kafka version 2.4.0. The method createInputTopic() has parameters to define the first record timestamp and the time increment of each record. If the test does not need to validate the record time, you can create TestInputTopics without those parameters.

private TopologyTestDriver testDriver;
private TestInputTopic<Long, String> inputTopic;
private TestOutputTopic<String, Long> outputTopic;
private final Instant recordBaseTime = Instant.parse("2019-06-01T10:00:00Z"); private final Duration advance1Min = Duration.ofMinutes(1);
@Before public void setup() { final StreamsBuilder builder = new StreamsBuilder(); //Create Actual Stream Processing pipeline MappingStreamApp.createStream(builder); testDriver = new TopologyTestDriver(builder.build(), MappingStreamApp.getStreamsConfig()); inputTopic = testDriver.createInputTopic(MappingStreamApp.INPUT_TOPIC, new LongSerializer(), new StringSerializer(), recordBaseTime, advance1Min); outputTopic = testDriver.createOutputTopic(MappingStreamApp.OUTPUT_TOPIC, new StringDeserializer(), new LongDeserializer()); }

Testing one record at a time

Testing record value

In this example, the stream that the code is testing swaps the key as a value and the value as a key. Thus, the code pipes the key and the value, though it is also possible to write only the value if there is no key. Here, the test is asserting only the value of the output records, so it uses the readValue() method.

@Test
public void testOnlyValue() {
    //Feed 9 as key and word "Hello" as value to inputTopic
    inputTopic.pipeInput(9L, "Hello");
    //Read value and validate it, ignore validation of kafka key, timestamp is irrelevant in this case
    assertThat(outputTopic.readValue()).isEqualTo(9L);
    //No more output in topic
    assertThat(outputTopic.isEmpty()).isTrue();
}

At the end of the test, isEmpty() assures that there are no more messages in the output topic. Earlier, before 2.4.0, the test code would have read records until a record was null. Now, if the test reads from an empty topic, an exception is thrown which simplifies testing significantly. For example, the test can validate with readValue() if it is expecting to get a null value. Additionally, now the reading from a non-existing topic causes an exception instead of returning the null value like before.

@Test
public void testReadFromEmptyTopic() {
    inputTopic.pipeInput(9L, "Hello");
    assertThat(outputTopic.readValue()).isEqualTo(9L);    
    //Reading from empty topic generate Exception
    assertThatExceptionOfType(NoSuchElementException.class).isThrownBy(() -> {
        outputTopic.readValue();
    }).withMessage("Empty topic: %s", MappingStreamApp.OUTPUT_TOPIC);
}

Testing KeyValue output

To validate both the key and the value, the test can use readKeyValue() and assert it against a KeyValue object.

@Test
public void testKeyValue() {
    //Feed 9 as key and word "Hello" as value to inputTopic
    inputTopic.pipeInput(9L, "Hello");
    //Read KeyValue and validate it, timestamp is irrelevant in this case
    assertThat(outputTopic.readKeyValue()).isEqualTo(new KeyValue<>("Hello", 9L));
    //No more output in topic
    assertThat(outputTopic.isEmpty()).isTrue();
}

Testing with TestRecord

If the test also needs to validate the record timestamp, it can use readRecord() and assert it against a TestRecord. The TestRecord constructor support both Instant timestamps as well as plain long timestamps.

@Test
public void testKeyValueTimestamp() {
    final Instant recordTime = Instant.parse("2019-06-01T10:00:00Z");
    //Feed 9 as key and word "Hello" as value to inputTopic with record timestamp
    inputTopic.pipeInput(9L, "Hello", recordTime);
    //Read TestRecord and validate it
    assertThat(outputTopic.readRecord()).isEqualTo(new TestRecord<>("Hello", 9L, recordTime));
    //No more output in topic
    assertThat(outputTopic.isEmpty()).isTrue();
}

Testing with TestRecord and ignoring the timestamp

If the test needs to validate the record header but does not care about timestamps, isEqualToIgnoringNullFields() from AssertJ is useful. This way, the actual record timestamp can be ignored. You can also implement a partial test with Hamcrest using allOf() and hasProperty() matchers.

@Test
public void testHeadersIgnoringTimestamp() {
    final Headers headers = new RecordHeaders(
            new Header[]{
                    new RecordHeader("foo", "value".getBytes())
            });
    //Feed 9 as key, word "Hello" as value, and header to inputTopic with record timestamp filled by processing
    inputTopic.pipeInput(new TestRecord<>(9L, "Hello", headers));
    //Using isEqualToIgnoringNullFields to ignore validating record time
    assertThat(outputTopic.readRecord()).isEqualToIgnoringNullFields(new TestRecord<>("Hello", 9L, headers));
    assertThat(outputTopic.isEmpty()).isTrue();
}

Testing collections of records

Testing with Value and KeyValue lists

Similarly to testing a single record, it is possible to pipe Value lists into a TestInputTopic. Validating the output can be done record by record like before, or by using the readValueToList() method to see the big picture when validating the whole collection at the same time. For our example topology, when the test pipes in the values, it needs to validate the keys and use the readKeyValueToList() method.

@Test
public void testKeyValueList() {
    final List inputList = Arrays.asList("This", "is", "an", "example");
    final List<KeyValue<String, Long>> expected = new LinkedList<>();
    for (final String s : inputList) {
        //Expected list contains original values as keys
        expected.add(new KeyValue<>(s, null));
    }
    //Pipe in value list
    inputTopic.pipeValueList(inputList);
    assertThat(outputTopic.readKeyValuesToList()).hasSameElementsAs(expected);
}

Testing with Value lists and with auto-advanced record time

This next test is writing a KeyValue list in which each TestRecord has a new time value based on the initial time and the auto-advance parameters set when the TestInputTopic was created.

@Test
public void testRecordList() {
    final List inputList = Arrays.asList("This", "is", "an", "example");
    final List<KeyValue<Long, String>> input = new LinkedList<>();
    final List<TestRecord<String, Long>> expected = new LinkedList<>();
    long i = 1;
    Instant recordTime = recordBaseTime;
    for (final String s : inputList) {
        input.add(new KeyValue<>(i, s));
        //Expected entries have key and value swapped and recordTime advancing 1 minute in each
        expected.add(new TestRecord<>(s, i++, recordTime));
        recordTime = recordTime.plus(advance1Min);
        i++;
    }
    //Pipe in KeyValue list
    inputTopic.pipeKeyValueList(input);
    assertThat(outputTopic.readRecordsToList()).hasSameElementsAs(expected);
}

Testing the reading of KeyValues to Map

If the test is validating a stream where only the latest record of the same key is valid, like an aggregating stream, it can use readKeyValuesToMap() and validate it. The difference with the returned map to a KTable is that after tombstones are sent (records with a null value), the map still contains those keys with a null value, but it does not remove the whole key. This is by design, so as not to lose the information that a key existed and was deleted later.

@Test
public void testValueMap() {
    final List inputList = Arrays.asList("a", "b", "c", "a", "b");
    final List<KeyValue<Long, String>> input = new LinkedList<>();
    long i = 1;
    for (final String s : inputList) {
        input.add(new KeyValue<>(i++, s));
    }
    //Pipe in KeyValue list
    inputTopic.pipeKeyValueList(input);
    //map contain the last index of each entry
    assertThat(outputTopic.readKeyValuesToMap()).hasSize(3)
            .containsEntry("a", 4L)
            .containsEntry("b", 5L)
            .containsEntry("c", 3L);
}

The full example code for this Kafka Streams test tutorial can be found on GitHub.

Migrating to a new interface

You can migrate the existing TopologyTestDriver test with a simple find-and-replace approach that allows you to modify most items. The time handling is modified to use the Instant and Duration classes, but you can also use a long timestamp with TestRecord. Many examples of modified tests are available in commit in KIP-470.

To use the new test input and output topic classes with older Kafka versions, there is a separate package that can be used by modifying only the package import. The package kafka-streams-test-topics can be found on GitHub and Apache Maven repositories.

Other testing examples can be found here:

Summary

With the ability to test Kafka Streams logic using the TopologyTestDriver and without utilizing actual producers and consumers, testing becomes much quicker, and it becomes possible to simulate different timing scenarios. TestInputTopic and TestOutputTopic classes simplify testing and enable you to use the assertion framework of your choice. Happy testing.

Interested in learning more?

If you’d like to know more, you can read Getting Your Feet Wet with Stream Processing – Part 2: Testing Your Streaming Application and download the Confluent Platform to get started with a complete event streaming platform, built by the original creators of Apache Kafka.

  • Jukka Karvanen developed this feature after realizing that he was always bringing his own helper classes to make testing with TopologyTestDriver easier. Jukka is currently working as DataOps architect at Finnair. He has 25 years of experience working with high-volume databases and real-time event streaming platforms. In his free time, he enjoys kayaking, tour skating, and playing badminton.

Did you like this blog post? Share it now