Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now
Kafka Streams is an abstraction over Apache Kafka® producers and consumers that lets you forget about low-level details and focus on processing your Kafka data. You could of course write your own code to process your data using the vanilla Kafka clients, but the Kafka Streams equivalent will have far fewer lines, because it’s declarative rather than imperative. As a library, Kafka Streams lets you create a standalone application that can be run anywhere that can connect to a Kafka broker, whether that’s a laptop or a hefty cloud server. You just need to provide it with the host and port name of a broker. Combining Kafka Streams with Confluent Cloud grants you even more processing power with very little code investment.
For an entire course on the basics of Kafka Streams, make sure to visit Confluent Developer.
When you define a Kafka Streams operation on an event stream or streams, what you’re really defining is a processor topology, a directed acyclic graph (DAG) with processing nodes and edges that represent the flow of the stream. Your event stream data comes in from Kafka through the source nodes at the top of the topology, flows through the user processor nodes where custom-logic operations are performed, and exits through the sink nodes to a new Kafka topic. In the processing portion of the topology, you can transform your data with various stateless operations, such as mapping and filtering: Mapping takes an input record, letting you produce a new key or value, or even a new type entirely, by applying a functional interface with a method; filtering lets you create a new stream that only includes the records that meet some criteria.
Unlike an event stream (a KStream in Kafka Streams), a table (KTable) only subscribes to a single topic, updating events by key as they arrive. KTable objects are backed by state stores, which enable you to look up and track these latest values by key. Updates are likely buffered into a cache, which gets flushed by default every 30 seconds. You can use many of the same operators on a KTable as you can on a KStream, including mapping and filtering.
To understand GlobalKTable, another available table type, you’ll need a basic understanding of the architecture of Kafka Streams, specifically partitioning: A KTable is sharded and only deals with one partition from a topic at a time, so it only sees a subset of your total data. However, in contrast, a GlobalKTable does hold all of the records across all partitions for a topic. Thus, it is usually used for data that is smaller in size, that is static and that is not updated frequently, e.g., zip code tables.
Serialization converts higher-level objects into the ones and zeros that can be sent across the network to brokers or stored in the state stores that Kafka Streams uses; the term also encompasses deserialization, the opposite process. SerDes are convenience wrappers around serializers and deserializers, not only saving you from speaking some extra syllables but also keeping you from having to specify both serializers and deserializers for objects that are the same type (you also don’t have to worry about when to use which one). As you may have seen in examples that feature a Consumed or Produced object, SerDes can be built in, e.g., Serdes.String, or custom. Creating a custom SerDes is just a matter of creating the serializer and deserializer that it wraps, and you can accomplish this by implementing the Serializer and Deserializer interfaces from the org.apache.kafka.clients package.
In Kafka Streams, records being joined must have the same key: Two events with unrelated keys are unlikely to be related and thus will not be joined. A common scenario is using a join to provide a full information set about a customer—for example, adding customer purchases to customer addresses. You join streams together into a third stream using inner, outer, and left-outer joins; the joins are windowed, they use read-only keys, and data is buffered in a local state store.
On the other hand, stream-table joins are not windowed. Here, the stream and table are paired using inner and left-outer joins, whereby records arriving from the stream are matched to the table (you can also join a stream to a GlobalKTable, which provides a mechanism for joining where the keys don’t have to match ahead of time and is good for enriching stream data). Finally, tables can be joined with other tables in a non-windowed fashion, producing a table, but keep in mind that GlobalKTable objects can only be joined with streams.
If you need to use Kafka Streams to count the number of times that a customer has logged into a browser session, or log the total number of items that you have sold in a given timeframe, you’ll need to use state. Stateful operations, which include count, reduce, and aggregate (a superset of reduce), are backed by state stores and require that you first group by key. If the records don’t have a key (or you need to change it) you have the ability to calculate a new key. But this causes Kafka Streams to repartition the data, making sure that all events with the same key end up on the same partition. You’ll also need a Materialized object, which tells Kafka Streams how to store your data in the state store. Stateful operations return KTable objects and don’t emit results immediately by default; rather, internal caching buffers results. Records are emitted when the configurable cache is full or after a 30-second default commit interval. To see all updates or to debug, you can set the cache size to zero.
Windowing in Kafka Streams gives you a snapshot of an aggregation over a given time frame, as opposed to its whole history—allowing you to do time-based analytics. You have several choices of windows.
Hopping windows are bound by time and have fixed endpoints and fixed sizes. A hopping window advances by an amount of time less than or equal to the size of the window, so the windows may overlap. If you have a five-minute hopping window that advances by one minute, you’ll have one window that contains the data from 0–5 minutes, a second that contains 1–6 minutes, and so on. Because of this overlap, hopping windows can have duplicate data. Tumbling windows are actually a type of hopping window, but the advance size is exactly equal to the window size. This means that if you have a window with a size of five that goes from 0–5 minutes, the next window would advance by five and would cover 6–11 minutes. For this reason, tumbling windows do not contain any duplicate records.
Session windows are not time-driven like the previous two types. They have a window start and a window end, but these are not fixed. Instead, they are defined by the events themselves in relation to an inactivity gap. If there is no activity within the defined gap, the window will close, but if there is activity, it will be merged with the previous events. For this reason, session windows can continue to grow infinitely. Finally, sliding windows have a fixed time size like hopping and tumbling windows but are driven by events, instead of by time, similar to session windows: A new window is created each time a record enters the sliding window or a record drops out of the sliding window. Sliding windows have a timeDifference that states the maximum amount of time two events can be separated for them to be considered within the same window.
Additionally, Kafka Streams lets you define a grace period on any of the windowed aggregations except for session windows, so records outside of a given window but within the defined grace period are still included in the aggregation.
The passage of time as experienced by Kafka Streams is determined by event timestamps, not by an external clock’s time or system time. Thus, timestamps drive the behavior in Kafka Streams including the windowing discussed above. Kafka Streams chooses the next event to process based on its timestamp, always choosing the event with the smallest timestamp, that is the earliest. As far as how these timestamps are initially set, events have a dedicated timestamp field that can be set by the producer according to the time in its environment when it sends the record, event-time processing, or by the brokers when they receive the record, log-append time processing, (which is actually known as ingestion-time processing in Kafka Streams). Typically, the producer will set the timestamp. Finally, timestamps are read from records using the TimestampExtractor interface.
The largest timestamp at any given time in Kafka Streams is known as stream time, and it always goes forward. If an event arrives with a smaller timestamp than the current stream time, it is out of order. But if it is still within the current window’s time plus the grace period, it is not late, and is thus accepted and processed.
The Processor API in Kafka Streams gives you much more flexibility than the standard DSL, but being a lower-level API, it holds you responsible for more details. It requires you, for example, to specify your actual topology node by node. The Processor API also gives you direct access to state stores, unlike the aggregate operations that have been covered up until this point, and it lets you call commit directly. One of the most powerful features of the Processor API, which can’t be done using the DSL, is that it lets you schedule arbitrary operations using a punctuator, a function you define that gets run at some regular interval. Punctuations can be triggered via stream time—which, as mentioned above, is dependent on the timestamps of events themselves—or wall clock time, which is dependent on the system time of the Kafka Streams application —although it can be more approximate than it appears, as a batch of records often needs to finish processing before the wall clock time will be checked. Note that it is also possible to mix features of the DSL and the Processor API.
Chances are you’ll want to unit-test and/or integration-test your Kafka Streams applications. Because Kafka Streams connects to brokers and you generally don’t want to have to set up a whole cluster just for unit testing, you can use the TopologyTestDriver class as a mock for your unit tests, which will let you determine whether the custom logic in your topology is producing the correct results. However, because TopologyTestDriver focuses on one element at a time, doesn’t adequately test the full caching behavior of stateful operations, and doesn’t write to real topics, your best choice in some cases is to use integration tests against actual brokers with the TestContainers library. TestContainers makes it possible to share a container across multiple test classes, which is important because it saves you from having to set up and tear down brokers for each integration test.
You want to shut down Kafka Streams only if an error is truly unrecoverable, which will happen, but certainly not with every error. Many errors you can log and recover from. Essentially, you want to deal appropriately with each error type and situation. For example, you typically don’t want to stop the world for a change in partition ownership.
Fortunately, Kafka Streams provides mechanisms that equip you to deal with the three broad categories of errors you will encounter:
Each category has its own special type of handler. A final consideration related to error handling is allowances you’ll need to make for the behaviors of the embedded producer and consumer instances in Kafka Streams.
Kafka Streams achieves parallelism by distributing tasks, its fundamental work unit, across instances of the application, as well as across threads within an instance. Thus, it’s a good idea to get a sense of how many tasks there are going to be when designing your Kafka Streams application. The number of tasks is determined by an application’s source topic with the highest number of partitions. You can have as many threads as there are tasks, although the default is one thread per application. A single thread will process each task one after another. So if you have eight tasks, for example, you could have one application with eight threads or alternately eight applications with one thread each. Note that adding extra application instances can be useful, even if they are going to be idle, because they can be used for failover. Tasks are distributed across multiple instances regardless of whether they are on the same node or laptop.
Finally, since Kafka Streams uses a Kafka consumer under the hood, it inherits from the consumer group protocol, whereby a consumer group is simply a group of consumers that share a group ID. A consumer group is useful because it lets a broker dynamically redistribute the workload over only the current and active members. Thus, if a Streams instance goes down, the remaining active ones pick up the task assignment(s) of the downed instance. This is really where Kafka Streams gets its scalability—if you need more processing power, you can just spin up additional Kafka Streams instances and the workload distribution is handled dynamically.
In Kafka Streams, state stores can either be persistent—using RocksDB—or in memory. Of course, because it is easy to lose a disk or power, neither type is fault tolerant. For this reason, Kafka Streams implements a changelog topic in Kafka, which receives all of the events that are sent to the store. Changelogs are compacted topics, meaning that only the latest change per key is maintained over time. If you lose a machine, Kafka Streams replays from the changelog upon startup to reconstitute the state store for the newly assigned task(s). This replaying takes time, however, so your other option is to use standby replicas, which allow for fast failovers. With standby replicas, another instance of your Kafka Streams application is designated as a standby for a specific task. The Streams instance with the active task executes your processor topology while the task on the standby Streams instance reads from the changelog topic into its local state store, without doing any of the processing itself. If the active node goes down, the standby can take over immediately. Use standbys by setting num.standby.replicas.config to be greater than zero, the default.
A common event streaming pattern is to write out events to a database, which is then separately queried for analytics by the UI layer of a reporting dashboard. Instead of sending your data out, however, you can actually directly query the stateful operations in your Kafka Streams application using interactive queries. To use these read-only queries, you need to name the state store that you’d like to query using a Materialized object or the Stores factory if you are in the Processor API layer. Each of your Streams instances actually contains the metadata for all of the other Streams instances with the same application ID, so if the instance queried doesn’t physically contain the data, it will issue the query to the instance that does on your behalf. For more information on interactive queries consult the Kafka Streams developer guide.
As mentioned earlier, Kafka Streams is declarative, which means that you tell it what you would like done rather than how to do it. Its nuances can take time to learn, but this post has introduced its most significant features. To learn about its elements in much greater detail, be sure to check out the Kafka Streams 101 course on Confluent Developer and use the promo code STREAMS101 for $101 of free usage in Confluent Cloud.
Tableflow can seamlessly make your Kafka operational data available to your AWS analytics ecosystem with minimal effort, leveraging the capabilities of Confluent Tableflow and Amazon SageMaker Lakehouse.
Building a headless data architecture requires us to identify the work we’re already doing deep inside our data analytics plane, and shift it to the left. Learn the specifics in this blog.