Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now
Have you ever wondered how to track events in a large codebase? I gave it a shot using Apache Kafka®! Read on to learn how to use GitHub data as a source, process it using a Kafka Streams topology, and send it to a Kafka topic.
GitHub’s data sources (REST + GraphQL APIs) are not only developer-friendly, but a goldmine of interesting statistics on the health of developer communities. Companies like OpenSauced, linearb, and TideLift can measure the impact of developers and their projects using statistics gleaned from GitHub’s APIs. The results of GitHub analysis can change both day-to-day and over time.
Apache Kafka is a large and active open source project with nearly a million lines of code. It also happens to be an event streaming platform. So why not use Apache Kafka to, well, monitor itself? And learn a bit about Kafka Streams along the way?
How would we get started? Confluent has a GitHub source connector which approximates real-time event streaming by making up to 5,000 requests to the API per hour. We can use that to get GitHub events (such as pull requests) into a Kafka topic. But then what? That’s when the power of Kafka Streams and its Processor API comes in! Kafka Streams is the native stream processing library that runs as a client, separate from the Kafka broker. (To process live data, we could also use Apache Flink®—more on using Flink in a future demo!) We’ll use it to analyze the data and obtain an up-to-date ratio of open pull requests to closed pull requests. This could tell us a couple of things about the Apache Kafka project: how many PRs are being opened by contributors, and maintainers’ ability to close them.
The structure of the project will look like this:
Implementing the GitHub source connector
Obtaining the pull request events from that connector, and
Processing those events and putting them into a new topic, “state”.
Data pipelines have both sources—the GitHub connector in the diagram above—and sinks, like the state topic. A source is like a kitchen faucet, it’s where you get the water, or data, from. A sink, on the other hand, is like, well, a kitchen sink! It holds the data flowing from the source. If you want to learn more about data pipelines, sources, and sinks, you can view this video by Wade Waldron. It’s a module from the Building Flink Apps in Java course, but much of what is said applies to Kafka and data pipelines in general.
Our source is the Confluent GitHub source connector. If you’d like to set one up for yourself, follow the instructions in this demo. At a high level, note that we will be pulling from the github-pull_requests
resource in the GitHub source connector. This resource grants us a lot of information about GitHub pull requests, including their status: open or closed.
Now, we can connect Kafka Streams to the events flowing in from our source connector and manipulate these events using streams to get the information we want. The final form of the information will determine the shape of our topology.
Confluent’s Kafka Streams docs define a stream processing topology:
“A processor topology or simply topology defines the computational logic of the data processing that needs to be performed by a stream processing application.”
Basically, a topology is a graph of what happens to your data. Here’s what our nodes look like (the arrows represent streams of data). There’s a GitHub source processor, a stream processor performing the analysis, and a sink processor sending the analyzed data to a new topic.
This topology is defined on line 68 of GitHubPrRatio.java
. The code on line 84 holds the pertinent methods stringing the topology together:
The .stream
method here takes in the input topic from the GitHub source connector. The .peek
method isn’t something we’d leave in for a production-level app, but it allows us to see the incoming keys and values when we’re running the demo. Next, the values are mapped to a JSON object with .mapValues
. Then, they’re processed and we use another .peek
method to see the outgoing keys and values, and then a .to
method sends them to our output topic.
In defining this topology, we’ll have a myProcessorSupplier
class providing the Processor instance in the .process
method above. It will filter out the open and closed pull requests and create a count of each, to finally output a ratio of open/closed pull requests.
On line 102 in GitHubPrRatio.java
, this MyProcessorSupplier
class provides a processor to the main
class which ingests the github-pull_request
stream in order to pull out the open
and closed
states.
On line 130 inside the MyProcessorSupplier
class, we've got a state store established, which will hold the current state of the open/closed pull request ratio. The init
method on line 112 schedules a punctuation to fire every second, printing the ratio in the store. The process
method on line 123 will take in the events, mark them as open or closed and increment the count, and stash them in the state store.
The process
method on line 123 will take in the events, mark them as open or closed and increment the count, and stash them in the state store.
There are two APIs for interacting with Kafka Streams: the more abstracted one is the Streams DSL (Domain Specific Language), which is built on top of the lower-level Processor API. In this case, we want to implement a state store, because calculating a final count of pull requests will mean that we need to involve state—the processing of each record is dependent on how the former records are processed.
Note: Not all Kafka Streams topologies need to involve state. For example, if all the topology is doing is filtering messages, then the record processing is independent of how the former records have been processed.
The Streams DSL automatically creates state stores for you in operations like aggregate()
, but the Processor API requires the developer to create the state store:
Once that’s done, we can access the state store when we initialize the processor using init
.
Note: The Processor API is closer to the metal, and like most lower-level APIs, provides the developer more control while necessitating the management of details like state stores.
The init
method, defined here in the demo on line 112, is called when Kafka Streams starts up. We want our instance to include any necessary info for task construction. In this case, we provide the state store context, then forward the new key value pair to the downstream processors every second, using context.schedule
. Note that we’d take out the line that prints to the console for a production app, but since the purpose of this application is to show how Kafka Streams works, we leave it in.
Our process
method on line 123 contains the logic used to count the open and closed PRs and create the ratio.
main
methodTo activate the processor, we declare a new instance of KafkaStreams
in the main
method. We pass in the topology we’ve built, then run streams.start()
.
Kafka stores bytes. This makes it highly performant and has the side effect of enabling it to take in data in many different formats. But you’ll also need a serializer/deserializer every time you work with Kafka as a source or a sink. In the code we’ve featured so far, you’ll have noticed a custom serializer/deserialiser: `prStateCounterSerde`
. It’s created on line 59:
This uses a custom Serde, defined in serde/StreamsSerde.java
, for the pull request state counter. Do take a look at the file to learn how it’s been implemented. For more on serialization, view this video from Confluent Developer’s Kafka Streams course.
That concludes the Streams portion of the project, but I’d love to hear the ways you’d push it further! One way to extend it is by adding a sink. Just as a Kafka Streams topology can have a sink, so can your entire Kafka application via a sink connector which automatically exports results. Below, I’ve fed the state
topic into Elasticsearch, which provides a cool graphical representation of the data via its visualize library:
I mentioned earlier that you can also process real-time data with Flink. I’ve got another sample repository in the works! In the meantime, here are some other resources to check out:
The demo behind this blog post
Get started with the Kafka Streams 101 course
Dive deeper with Kafka Streams processing tutorials
Learn more about connectors with the Kafka Connect course
More Kafka demos for your perusal
A Flink SQL tutorial to get you started with Flink
If you’ve been working with Kafka Streams and have seen an “unknown magic byte” error, you might be wondering what a magic byte is in the first place, and also, how to resolve the error. This post explains the answers to both questions.
Kafka Streams is an abstraction over Apache Kafka® producers and consumers that lets you forget about low-level details and focus on processing your Kafka data. You could of course write […]