Twitter, one of the most popular social media platforms today, is well known for its ever-changing environment—user behaviors evolve quickly; trends are dynamic and versatile; and special and emergent events occur unexpectedly at any time. To keep pace, Twitter heavily invests in recommendation systems to power its products, including its home timeline ranking and ad prediction systems, just to name a few. Unfortunately, a stale model that is trained from old data can quickly become irrelevant. An up-to-date model is vital to a responsive, adaptive, and successful prediction system that leads to user satisfaction.
Twitter recently built a streaming data logging pipeline for its home timeline prediction system using Apache Kafka® and Kafka Streams to replace the existing offline batch pipeline at a massive scale—that’s billions of Tweets on a daily basis with thousands of features per Tweet. The new streaming pipeline significantly reduces the entire pipeline latency from seven days to about one day, improves the model quality, and saves engineering time. For an overview of the pipeline, please refer to the Streaming logging pipeline of home timeline prediction system blog post. The remainder of this blog post details the customized Kafka Streams join functionality that was custom-built to support the pipeline.
A machine learning (ML) logging pipeline is just one type of data pipeline that continually generates and prepares data for model training. In a nutshell, an ML logging pipeline mainly does one thing: Join. Feature-label joins are the most prevalent type and are typically left joins because real-world ML systems recommend many more items than those actually engaged by users.
There are other types of joins too. For example, features could be generated from different components or systems that need to be joined together to become the final feature set—such joins are typically inner joins. In a machine learning logging pipeline, you need roughly the same amount of “negative” training examples, which can’t be obtained by inner joins. This blog post focuses on the customized left join DSL that was built as an alternative to the Kafka Streams native join DSL.
Kafka and its ecosystem are not new to Twitter—this blog post discusses the move toward Apache Kafka from its in-house EventBus (which builds on top of Apache DistributedLog). Twitter also integrates Kafka Streams into Finatra—its core web server stack. Thus, it is very convenient for Twitter to build a ML logging pipeline using Kafka Streams as all the infrastructure provided by Finatra are immediately available. Moreover, Kafka’s excellent documentation and well-organized code make it fairly easy to be customized.
The Kafka Streams library comes with a default join functionality that has the following attributes:
ML use cases, however, may have some special requirements:
|[(A, b), (B, b)]
|[(A, a), (A, b)]
|[(A, [a, b]), (B, [a, b]), (A, [a, b])]
Although Kafka Streams’ native join DSL doesn’t provide everything that is needed, thankfully it exposes the Processor API, which allows developers to build tailored stream processing components. Having a non-fault-tolerant state store can be achieved by defining a customized state store with the changelog topic backup disabled (please note this is not advised for an ML logging pipeline).
Having a customized LeftJoin requires building a key-value store to temporarily keep both joined and unjoined records sorted by the served order. To achieve this goal, two new state stores were introduced to the LeftJoin, in addition to two WindowStores on each side: a KeyValue state store (kvStore) on the left side and a JoinedIndicatorStore that is shared by both sides. The following describes the data models of the state stores:
The following details the procedures of the left join:
The normal processor. When a left or right record comes in, it is always put into both the KeySortedStore and TimeSortedStore. The relationship of the fields can be seen from the right-side stores in Figure 2. Specifically, the payload key of the KeySortedStore corresponds to the payload key of the TimeSortedStore; the timestamp field of the KeySortedStore corresponds to the payload key of the TimeSortedStore; and the offset, which is the value of the KeySortedStore, corresponds to the offset of the TimeSortedStore as part of the key.
At the same time, the procedure looks up the other side of the KeySortedStore to find whether there is a match. If there is a match, then put the payload key, left-side timestamp, and right-side payload value into the JoinedIndicatorStore. The following describes how the fields mapping between KeySortedStore and TimeSortedStore helps: If the join occurs on the right side, then you have the payload value already and can directly insert it to the JoinedIndicatorStore. But if the join occurs on the left side, then you need to first fetch the fields from the right-side KeySortedStore as that’s how records are matched. Then, fetch the corresponding payload value from the TimeSortedStore by the fields in the KeySortedStore and finally insert the payload value to the JoinedIndicatorStore.
The records in the JoinedIndicatorStore won’t be emitted immediately—instead, they will be checked and potentially emitted in the left-side punctuator (see details below). The timestamp of the JoinedIndicatorStore is always the left-side timestamp—this is because you will need to use the left-side record’s timestamp to fetch the JoinedIndicatorStore in the punctuator (details below). Also, the value of the JoinedIndicatorStore is always the right-side payload value. Again, this is because there will be a punctuator that periodically runs on the left side. There, you can fetch the left-side payload value by querying the left-side kvStore.
The punctuator. There is a periodic procedure called the punctuator that runs in the left-side procedure. The left-side punctuator basically waits until the window closes for a left-side record and emits the corresponding matched records in the JoinedIndicatorStore. The above process is fulfilled by range-scanning the left-side TimeSortedStore and then point querying the JoinedIndicatorStore. Field mapping helps here: The payload key of TimeSortedStore corresponds to the joined key in the JoinedIndicatorStore while the timestamp corresponds to the timestamp. Thus, you can efficiently fetch records from both stores. Finally, in the punctuator, you can also clean up the expired records in the left-side TimeSortedStore. Cleanup of the KeySortedStore and JoinedIndicatorStore is taken care of by Kafka Streams’ WindowStore.
Performance-wise, since the WindowStore contains only minimal data, the point matching query is super fast. Because the timestamp in the kvStore is sorted, the range scan in the periodic procedure is also efficient. An efficient implementation is critical since the join library needs to handle up to millions of events per second for Twitter’s traffic level.
This section discusses how consumer lag and potentially the loss of data could impact the data quality of the logged data, which could in turn impact the success of the business. For example, Twitter has more than sufficient events and because the ML training data is i.i.d., a tiny portion of data loss is tolerable. However, this may not be true for other use cases or other organizations that have a different requirement.
The focus of this discussion is on the customized left join as it has more implications than the inner join. The discussion is split into two possible scenarios:
Here, the left join uses the event timestamp, not the consumption timestamp, for matching. That means, as long as the left-side event is consumed within its joining window, it will be joined with all right-side matching events. Also, matching right-side events that arrive later than the consumption timestamp will also be joined (this is the normal case). The diagrams below illustrate the process. The conclusion: Left-side consumer lag during service runtime is okay as long as the event is consumed within its join window, which can almost be guaranteed.
This blog post discussed the customized Kafka Streams join DSL that supports the ML-specific logging pipeline at Twitter. The details of the join DSL are presented and the impact of consumer lag is also examined. Overall, the customized join DSL is a major win to Twitter’s ML logging pipeline:
In the future, Twitter would like to try the cooperative rebalancing that was introduced in Apache Kafka 2.5, hoping to reinforce the entire pipeline even more.
Learn how to use Apache Kafka the way you want: by writing code. Apply functions to data, aggregate messages, and join streams and tables with Kafka Tutorials, where you’ll find tested, executable examples of practical operations using Kafka, Kafka Streams, and ksqlDB.
Twitter’s home timeline streaming logging pipeline was developed by Peilin Yang, Ikuhiro Ihara, Prasang Upadhyaya, Yan Xia, and Siyang Dai. We would like to thank Arvind Thiagarajan, Xiao-Hu Yan, Wenzhe Shi, and the rest of Timelines Quality team for their contributions to the project and this blog post. We would also like to give a special shoutout to leadership for supporting this project: Xun Tang, Shuang Song, Xiaobing Xue, and Sandeep Pandey. During development, Twitter’s Messaging team has provided tremendous help with Kafka, and we would like to thank Ming Liu, Yiming Zang, Jordan Bull, Kai Huang, and Julio Ng for their dedicated support.