Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Testing Kafka Streams – A Deep Dive

Written By

Tools for automated testing of Kafka Streams applications have been available to developers ever since the technology’s genesis. Although these tools are very useful in practice, this blog post will address some of the challenges you might encounter as you start using them.

Features of TopologyTestDriver

The main testing tool for Kafka Streams applications is TopologyTestDriver. Since being introduced in version 1.1.0, its API has undergone significant improvements and versions since 2.4 are convenient and easy to use.

A test using TopologyTestDriver looks like a regular unit test for a function:

  • Define the input data (aka test fixture)
  • Send it to the mocks of the input topics (TestInputTopic)
  • Read what was sent by the topology from the mocks of the output topics (TestOutputTopic)
  • Validate the result, comparing it to what was expected

To create a TopologyTestDriver, you need a Topology (this is the actual program we are testing) and a configuration as an instance of the Properties class:

TopologyTestDriver topologyTestDriver = 
  new TopologyTestDriver(topology, config.asProperties());

To create input test topics, you need to provide key and value serializers. For output topics, you need to provide deserializer:

TestInputTopic<String, String> inputTopic =
   topologyTestDriver.createInputTopic(
     	INPUT_TOPIC, new StringSerializer(), new StringSerializer());
TestOutputTopic<String, String> outputTopic =
   topologyTestDriver.createOutputTopic(
     	OUTPUT_TOPIC, new StringDeserializer(), new StringDeserializer());

The TestInputTopic and TestOutputTopic classes contain sets of methods that allow you to conveniently send/read data entries one by one and in batches.

What is being sent/received TestInputTopic methods TestOutputTopic methods
A single value pipeInput (V) V readValue ()
A key/value pair pipeInput (K, V) KeyValue<K,V> readKeyValue()
A list of values pipeValueList (List<V>) List<V>
readValuesToList()
A list of key/value pairs pipeValueList (List<V>) List<KeyValue<K,V>>
readKeyValuesToList()Map<K,V> readKeyValuesToMap()
A list of Records pipeRecordList (List<? extends TestRecord<K, V>>) List<TestRecord<K, V>>
readRecordsToList()

You can send or read a key/value pair, or just a value, with or without an explicit timestamp. If the information in the headers is required for the topology being tested, you can work with Records instead.

The ability to conveniently set a timestamp allows you to write fast and deterministic tests for time-windowing logic, without having to wait using Thread.sleep().

What is being sent TestInputTopic methods
A value or a key/value pair with a given timestamp pipeInput(V, Instant)
pipeInput(K, V, Instant) pipeInput(K, V, Instant)
A value list or a key/value list with a given timestamp for the first record and a time difference between two consecutively generated records pipeValueList (List<V>, Instant, Duration)

pipeKeyValueList(List<KeyValue<K, V>>,
                Instant,
                Duration)

Unlike a “real” KafkaStreams instance, TopologyTestDriver processes all inputs synchronously and completely. For example, when you pipe some input, TopologyTestDriver processes it, immediately flushes any caches, writes results to (mocked) repartition topics, and polls those topics, etc., until there is nothing else to process. This enables you to make assertions about the output immediately after writing instead of having to poll until the expected results arrive, as with the full distributed system.

You can read all the intermediate results using methods like readKeyValuesToList() or readRecordsToList(). Or, if you only care about the final outcome (in the case of KTable operations), you can use readKeyValuesToMap(). The result returned by these methods is the complete result of processing the input data.

Another important feature of TopologyTestDriver is access to key-value stores:

KeyValueStore<String, Long> store =
    topologyTestDriver.getKeyValueStore(STORE_NAME);

This allows you to read the status of local storage and perform both black box and white box topology testing.

ℹ️ There are generally no problems with running TopologyTestDriver; however, you should keep in mind that during its operation, TopologyTestDriver saves its state to a hard drive; therefore, in order to avoid conflicts with other tests and problems with restarting the tests, you must not forget to close it by calling .close() in the tearDown method of the test class. An easy (and highly recommended) way to ensure this always happens is to use the try-with-resources pattern, since TopologyTestDriver implements AutoCloseable.

If you run tests under Windows, also be prepared for the fact that sometimes files will not be erased due to KAFKA-6647, which is fixed in version 2.5.1 and 2.6.0. Prior to this patch, on Windows you often need to clean up the files in the C:\tmp\kafka-streams\ folder before running the tests.

The TopologyTestDriver-based tests are easy to write and they run really fast. Since they test the system from input message deserialization through output message serialization, problems related to serialization/deserialization are detected in these tests automatically, without any validation of the output data. Ideally, the development of any functionality utilizing the Kafka Streams API should start with the TopologyTestDriver-based test.

However nice the fully synchronous processing model may be, it does mean that TopologyTestDriver won’t be able to expose some bugs specifically related to asynchronous processing in the real production system. In the next section, I’ll share a bug related to cache dynamics that isn’t possible to catch with TopologyTestDriver.

TopologyTestDriver can be blind to certain defects

In practice, there are cases when the topology behavior tested with TopologyTestDriver is different from the behavior in a real cluster.

Here’s an example based on a real incident that occurred in my production environment.

To perform deduplication, i.e., selecting unique records in the stream, you need to implement an analog of the distinct() method of the Java Stream API. This task is quite common, and we plan to propose adding distinct() to the KStream interface. See KAFKA-10369 for more information.

In our project, we wanted to avoid the complexities of using key-value stores and the Processor API by building the desired behavior with the high-level DSL. Here is our attempt:

final String STOP_VALUE = "<STOP>";
KStream<String, String> input =
  streamsBuilder.stream(INPUT_TOPIC_WRONG,
  Consumed.with(Serdes.String(), Serdes.String()));

input.groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .reduce((first, second) -> STOP_VALUE) .toStream() .filter((key, value) -> !STOP_VALUE.equals(value)) .to(OUTPUT_TOPIC_WRONG, Produced.with(Serdes.String(), Serdes.String()));

ℹ️ IMPORTANT: This code is not correct, so please do not use it in production. The correct solution of the deduplication task utilizing Kafka Streams is somewhat more complicated and is described in this Kafka tutorial at developer.confluent.io.

reduce() is invoked only for the second occurrence of a key while the first occurrence is passed through, so always returning STOP_VALUE from the reduce and then filtering out all the STOP_VALUES means that we should only get the first occurence of each key in the output, right?

Below is a test for this code:

inputTopic.pipeKeyValueList(
    	List.of("A", "B", "B", "A", "C")
             .stream().map(e -> KeyValue.pair(e, e))
             .collect(toList())
);
assertEquals(List.of("A", "B", "C"), outputTopic.readValuesToList());

The test is green! Should it be deployed into production?

Unfortunately, in production, we found that many (if not all) values were missing from the output topic. This is due to store caching (see Kafka documentation on memory management), which the TopologyTestDriver does not simulate.

In a production Kafka Streams context, state stores by default use an in-memory cache to reduce disk and network I/O as well as CPU consumption from downstream processing. In addition to serving cached reads from memory, the cache only saves the latest update to a record. This allows Streams to collapse multiple consecutive updates to the same record into one. This is OK to do because the result of a KTable operation (like the reduce() in our example) is a stream of updates (called a changelog stream). Whether the resulting stream includes every intermediate state or only some of them, the final result is the same.

This was the mistake in our implementation: The desired “final” result of our reduce() is actually the first output, not the last one. Therefore, if we got two updates to the same record in rapid succession, the cache would swallow the first one (which is actually the result we wanted) and keep the second and subsequent ones (which is STOP_VALUE). When filtering out STOP_VALUE later, we would actually see no results at all for most of the records.

Since caching is not emulated by TopologyTestDriver, the test shown above will always run and create a false illusion of correct code, but even a fairly simple integration test with KafkaStreams would have revealed the bug. Therefore, it’s a good idea to write most of your tests using TopologyTestDriver (leveraging the fast, synchronous, deterministic execution for robust and predictable tests), but also to include a few integration tests to be sure your application will work in a production environment.

Integration testing with “real” KafkaStreams: EmbeddedKafka and TestContainers

There are two common options for using “real” Kafka in tests: EmbeddedKafka from the Spring for Apache Kafka® project and the TestContainers library.

EmbeddedKafka, in full accordance with its name, runs the Kafka broker in the same Java process where the tests are run.

TestContainers is a Java library that runs databases, browsers, and anything else inside Docker containers.

Both of these alternatives work on similar principles: You need to create an object associated with the Kafka broker (embedded or containerized), get the connection address from it, and pass the address to the application parameters. In Spring Boot, spring.kafka.bootstrap-servers is the parameter responsible for connecting to Kafka.

Although EmbeddedKafka is convenient, there are some potential drawbacks to consider compared to TestContainers:

  • It pulls in transitive dependencies (including Scala) that may conflict with your project’s own dependencies. With TestContainers, the brokers get their own JVMs with their own classpath.
  • You need to pay attention to the lifecycle of the embedded broker and the directory it uses to store data to avoid conflicts between concurrent tests and dirty state between runs. Using TestContainers, you get better isolation between runs.
  • EmbeddedKafka just allocates extra threads and objects in the same JVM as your tests, so it’s not possible to isolate the broker’s memory or CPU usage from your tests themselves. With TestContainers, you can throttle performance by allocating just a few processors for Docker, and the brokers get their own JVM processes and memory space.

Over the past few years, I’ve had a positive experience using TestContainers not only for testing Kafka but also in other projects with relational and NoSQL databases. This is why I would definitely recommend using TestContainers.

However, the examples below demonstrate both EmbeddedKafka and TestContainers.

Spring allows you to “automagically” connect a Kafka broker to the system under test, especially for EmbeddedKafka:

@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty =
                          "spring.kafka.bootstrap-servers")
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public class TestTopologyKafkaEmbedded {
   /*At the moment the test is run, the KafkaStreams object
 	will be configured to use the EmbeddedKafka connection*/

A bit more annotation magic is needed for TestContainers, but the idea is generally the same: We need to substitute the spring.kafka.bootstrap-servers property during the context initialization.

Then you can use KafkaProducer to populate the input topic in a straightforward way:

List.of("A", "B", "B", "A", "C")
        .stream().map(e->new ProducerRecord<>(inputTopicName, e, e))
        .forEach(producer::send);
producer.flush();

Unfortunately, this is the point where the easy part ends. How do you verify the system’s output?

Of course, you have the ability to receive messages from Kafka. But the system under test now works completely asynchronously with your test. In other words, you can’t distinguish between:

  1. A situation where there are no messages in the topic because the system is running a time-consuming task and hasn’t emitted them yet
  2. A situation where the system already finished processing without emitting any messages

Imagine that you have received a record “A” from the topic, then waited for five seconds in which no additional records appeared. Does this mean that the system finished the processing and no more records will follow? Or should you wait for another five seconds for more results?

Asynchronous testing implies that you need to set timeouts in the testing code, which leads to an increase in the run time of the test. And, of course, any timeout is dependent on the current system performance.

Here is a possible approach to the verification of the deduplication algorithm utilizing a Kafka broker:

  List actual = new ArrayList<>();

while (true) { ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer, 5000 /* timeout in ms */); if (records.isEmpty()) break; for (ConsumerRecord<String, String> rec : records) { actual.add(rec.value()); } }

assertEquals(List.of("A", "B", "C"), actual);

As you can see, this test can’t run faster than five seconds. If we have dozens of tests like this one, they will add up to a significant amount of time. On the other hand, we cannot make the timeout too small, otherwise, we are risking to exit the loop prematurely.

In most cases, we would like to wait for a certain amount of time until a verified condition is met and then finish the asynchronous test execution immediately in order to save time. This can be difficult to implement, but fortunately, the Awaitility library makes things a bit simpler. This library facilitates asynchronous testing and offers a lightweight DSL that allows you to express expectations of a system under test. For example, here is how you can express the expectation that the result should become equal to {"A", "B", "C"} in at most 10 seconds:

Awaitility.await().atMost(10, SECONDS).until(
         	() -> List.of("A", "B", "C").equals(actual));

Under the hood, Awaitility will periodically invoke the lambda argument and successfully finish the execution as soon as the lambda returns true. If it still returns false after 10 seconds, the test fails. The default polling interval is 100 milliseconds, but of course you may choose your own interval and even set up a Fibonacci or exponential increase of the interval.

If you have multiple asynchronous tests, this approach can really save test running time by getting rid of the unneeded wait.

Keep in mind:

  • The polling of the output topic should be initiated in a separate thread, with a correct implementation of cooperative thread termination.
  • Monitored value is shared between consumer and verifier threads and thus should be accumulated in a thread-safe data structure. We are using CopyOnWriteArrayList here because we need to check for the deduplication of records. In most cases, only the latest result for each key must be verified, so ConcurrentHashMap may be generally better.
//this list holds a shared value
List actual = new CopyOnWriteArrayList<>();
ExecutorService service = Executors.newSingleThreadExecutor();
Future<?> consumingTask = service.submit(() -> {
  while (!Thread.currentThread().isInterrupted()) {
	ConsumerRecords<String, String> records =
         KafkaTestUtils.getRecords(consumer, 100);
	for (ConsumerRecord<String, String> rec : records) {
  	actual.add(rec.value());
	}
  }
});

try { Awaitility.await().atMost(5, SECONDS) .until(() -> List.of("A", "B", "C").equals(actual)); } finally { consumingTask.cancel(true); //should be enough to finish a while loop iteration service.awaitTermination(200, MILLISECONDS); }

On my machine, passing tests run for about 400 milliseconds and failing ones run for a bit more than five seconds.

After running all the tests in the project, the final picture looks like this: TopologyTestDriver quickly accepts both correct and incorrect implementations of the deduplication method. EmbeddedKafka and TestContainers run for a longer time, but both can distinguish correct and incorrect implementations.

Correct vs. incorrect implementations

However, one problem still remains. We can’t reliably test for the fact that no extra messages will appear in the output topic. The comprehensive test for the deduplication algorithm should not only check that “ABBAC” input turns to “ABC” at some point, but it should also check that no more output records will be produced. Of course, we can wait for another five seconds and check that nothing is fetched. This will significantly increase the running time of our tests, but it still won’t give us a 100% guarantee.

In certain scenarios, a special “marker record” can be put at the end of the input dataset, and the appearance of this marker at the output of the pipeline might be sufficient evidence that the processing is finished. But this approach is not generalizable. The inability to distinguish between long wait time and finished processing is an insurmountable problem in asynchronous testing.

Summary

TopologyTestDriver is a required tool for creating Kafka Streams applications. However, it is important to keep in mind that it doesn’t fully reflect all the aspects of real Kafka cluster behavior. We examined only one example, but in practice, we’ve come across a number of cases where TopologyTestDriver-based tests showed incorrect results and additional TestContainers-based tests were needed.

The problem with EmbeddedKafka or TestContainers tests is that they are asynchronous, and this implies certain difficulties and limitations. To make test code clearer and reduce test running time, you can use the Awaitility library. If you are using Spring, you might also want to take a look at spring-test-kafka, which further simplifies dataset-driven testing of Kafka applications and has Awaitility under the hood. Still, the challenge of distinguishing between cases with an empty result set and a slow result set persists for all asynchronous tests.

The crucial difference between TopologyTestDriver– and TestContainers-based tests is that the former are unit tests, while the latter are integration tests. Thus, according to the testing pyramid model, the best strategy is to write as many tests as possible utilizing the fast and synchronous TopologyTestDriver. Only when it fails to check something should you resort to asynchronous, slower, and less reliable TestContainers-based tests.

For all the code examples in this post, please visit GitHub.

Meanwhile, learn how to use Apache Kafka the way you want: by writing code. Apply functions to data, aggregate messages, and join streams and tables with Kafka Tutorials, where you’ll find tested, executable examples of practical operations using Kafka, Kafka Streams, and ksqlDB.

  • Ivan Ponomarev is a tech lead at KURS and a tutor at the Moscow Institute of Physics and Technology. He has more than 15 years of experience in IT, enjoys writing code, and manages projects ranging from ERP systems customization to building real-time web-scraping systems. He also speaks at conferences and meetups.

  • John Roesler is a software engineer at Confluent, focusing mainly on Kafka Streams. Prior to that, he helped build and operate the high-volume event streaming platform at Bazaarvoice.

Did you like this blog post? Share it now