A team from Thoughtworks was engaged to replicate selected data from a 25-year-old IBM mainframe system into scalable microservices that can support big surges in demand without placing any load on the mainframe. The client selected a commercial CDC (change data capture) product that can copy IBM Db2 database changes to Apache Kafka®.
We designed an event-driven pipeline where the first stage is a Kafka Streams application that combines individual database change messages into transaction event messages. It consumes CDC messages from 25 database tables in a single topic and produces event messages into two domain topics, keyed by a domain ID. All messages are serialized using Avro.
The Kafka Streams application has two responsibilities:
Combine messages belonging to the same database transaction. Any service that consumes the messages can process them as atomic changes.
Separate changes by business domain and produce events to domain-specific topics. A customer service, for example, does not need to consume messages from the licensing topic.
This article describes the Kafka Streams approach we used, illustrated using a small example with four database tables in one domain.
The CDC product produces one message per database row change. Each message value includes a header section with transactional information, including these fields:
The values of transactionEventCounter range from 1 to the number of change messages in that transaction. The message with the highest value of transactionEventCounter has transactionLastEvent value true.
An example truncated message, rendered as JSON, is:
This CDC product provides little control over message order because it has limited capability for defining message keys. We configured it to produce all messages to a single topic, partitioned randomly.
A simplified data model with four tables is used in this article. This simplified diagram shows the tables and their primary key columns:
The tables are divided into groups, where those in a group have a common column:
Customers CustomerNames CustomerAddresses
Why did we do this? We need to be able to process database transactions with any combination of database change messages.
Each table group corresponds to an aggregate root in the domain-driven design sense and the common column contains the keys used when producing messages for those groups. The CustomerAddresses table is part of the Customer group because it also contains information about the type of customer address and is more naturally part of the customer aggregate. Each table belongs to only one table group.
We designed a set of domain event messages for each table group:
The source system does not delete customer and address records.
Our Kafka Streams topology has three main stages, explained in detail below:
Group the consumed messages by transaction ID.
Aggregate the messages into objects with all the messages for each transaction.
When all messages belonging to a transaction have been received, transform the transaction objects into well-contextualized event messages of the types described above, and produce them to sink topics.
Here is a simplified view of the three stages, followed by enhancements to cater for complexity in actual use.
All code examples here are in Kotlin. The messages all have string keys and Avro-serialized values.
The first stage groups the incoming messages by transaction ID. This is necessary because the CDC product is not able to generate Kafka keys using transaction ID.
In this code:
Consumed messages are deserialized using
transactionIdFrom extracts the transaction ID from the transactional information included in each message.
Grouping messages here forces Kafka Streams to repartition messages but that is unavoidable, given the limited options in the CDC product for creating message keys.
This is the core of the topology, where the grouped messages are added to
Transaction objects, keyed by transaction ID. The
Transaction class is a container with zero or more records from any of the four tables we are processing, and fields that are used to determine when all messages from a transaction have been aggregated. See below for how we constrain the size of the state store.
In this code:
TransactionBuilder.newTransaction() constructs a new
TransactionBuilder.addEvent() method adds an event message to a transaction and returns a new, immutable
The transaction objects are materialized to a key-value state store called
Transaction class is serialized using
We do not use windowed aggregation because it is not a good fit for aggregating by transaction ID:
We have no expectations about how long it will take for all the messages in a transaction to be received.
There is no mechanism for calling back when a window closes so we still need to detect transactions that never complete (see below).
KTables continuously emit messages as they aggregate incoming messages but we only want to continue processing transactions that are complete. We use a filter that only allows complete transactions to pass.
In this code:
TransactionBuilder.iscomplete() function uses transactional information in the messages to determine if all messages for that transaction have been aggregated (see above). If all the messages up to the final one (with last-message flag set true) have been aggregated, in any order, then it is complete.
KTable is converted back to a
KStream for further processing.
In this code:
transformTransaction() function returns one or more
KeyValue objects with an event message derived from the source transaction data.
Message keys are customer ID for CustomerCreatedEvent and CustomerModifiedEvent messages, and address ID for AddressCreatedEvent and AddressModifiedEvent messages.
The resulting stream is written to the sink topic with messages serialized using
flatMap because the contract for produced events is one event per address ID or customer ID and the application needs to process transactions that span multiple database rows.
For example, a single transaction may insert a row into Addresses and CustomerAddresses, so we need to produce both AddressCreatedEvent and CustomerModifiedEvent messages.
Similarly, we need to deal with batch insert or update transactions. For example, if a batch process inserts multiple address rows, we need to produce that number of AddressCreatedEvent messages.
The topology we use has more complexity to handle situations encountered in our real-world experience processing CDC transactions.
One of our concerns was to know if transactions never complete: in other words, if not all the CDC messages for a transaction are consumed by our service, for whatever reason.
Transactions that never complete will remain in the state store undetected. Our solution is to create a
Processor implementation that schedules a
Punctuator to periodically check for incomplete transactions.
A side effect of not using windowed aggregation is that the local key-value state store used for
Transaction objects has no built-in mechanism to delete objects after they have been processed. We use the punctuator function for that purpose as well.
init() method includes:
The cleanup function:
In this code:
The punctuator checks every minute.
cleanUpOldTransactions iterates through the transactions in the state store.
TransactionBuilder.isOld() checks whether the transaction started more than an hour ago.
When an old, incomplete transaction is found,
signalExpiredTransaction() generates an alert that prompts investigation and action.
The old transaction is deleted from the state store.
To trace messages through processing by Kafka Streams and other services, we created a correlation ID that is not part of the messages. It is passed in Kafka headers and included in log events.
The Kafka Streams DSL does not provide access to message headers but has access to the Processor API, which does. Our solution is to call
KStream.transformValues() immediately after consuming messages. An implementation of
ValueTransformer has access to the
ProcessorContext, where we add a correlation ID to Kafka message headers. The processor’s
transform() method returns the message unchanged.
The topology preserves headers through repartitioning and forwards them through aggregation. This means the complete
Transaction object is in a message with headers from the last CDC message that was part of it.
Our correlation ID is made by using a truncated hash of the source transaction ID. This design enables us to correlate all Kafka messages originating in one database transaction.
For initial load, the CDC product produces “refresh” messages representing a point-in-time view of the database tables. These messages do not contain any transactional information.
Our application has a second topology for processing refresh messages (not shown here) that uses the same steps but groups and aggregates messages by the ID values common to table groups (i.e.
address_id). It produces messages for each table group: AddressSnapshotEvent and CustomerSnapshotEvent.
We occasionally encountered duplicate CDC messages. The effect of these is to cause the topology to produce duplicate domain event messages (assuming the duplicate CDC message arrives before the
Transaction is removed from the state store).
The contract with downstream services is for at-least-once delivery and for those services to treat domain events as idempotent.
We created a Kafka Streams application as the first stage in a pipeline that uses CDC to copy data from a Db2 database into highly available domain microservices. The application is responsible for combining messages from database transactions into discrete events, and for producing those events to domain-specific topics.
The Kafka Streams application uses a simple topology to aggregate database-row messages into transactional events for downstream processing. The topology includes a check that all messages for a transaction have been processed, generating an alert if messages are not consumed within a reasonable time period.
The application constructs a correlation ID from the transaction ID and adds it to the Kafka headers, to enable tracing of messages that originate from a single database transaction. It can also produce snapshot messages from an initial load of the data.