Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now
Recently, I got my hands dirty working with Apache Flink®. The experience was a little overwhelming. I have spent years working with streaming technologies but Flink was new to me and the resources online were rarely what I needed. Thankfully, I had access to some of the best Flink experts in the business to provide me with first-class advice, but not everyone has access to an expert when they need one.
To share what I learned, I created the Building Flink Applications in Java course on Confluent Developer. It provides you with hands-on experience in building a Flink application from the ground up. I also wrote this blog post to walk through an example of how to do dataflow programming with Flink. I hope these two resources will make the experience less overwhelming for others.
One of the challenges I had while learning Flink was that many of the learning materials focused on small, unrealistic applications such as word counters and basic string processors. I would prefer a more in-depth and realistic application to help me understand the bigger picture.
To that end, I built a small Java pipeline that consumes clickstream data from Apache Kafka®. Consuming clickstreams is something that many businesses might legitimately have a use for. It can also be generalized to consuming other types of streaming data. The pipeline uses a keyBy
operation to group each clickstream record according to the request. It then separates those records into one-minute windows. Finally, it computes the number of successes and failures during each window.
The full code for this blog post can be found in the GitHub repository.
In this blog post you will learn how to:
Set up a Maven project
Create a connection to Kafka
Serialize and deserialize messages
Consume messages from a topic
Group messages by key and time window
Produce results to a separate Kafka topic
I chose to build my project using Apache Maven. I won't go into the full Maven file, but I will highlight some key elements.
There are three dependencies I need in my project. These include:
flink-streaming-java: Provides the Java libraries for the Datastream API
flink-json: Allows Flink to serialize and deserialize JSON records
flink-connector-kafka: Used to produce and consume data from Kafka topics
You can see the Maven dependencies below:
When the code is sent to Flink, it must be packaged into a single uber JAR (aka fat JAR). This is done using the maven-shade-plugin
. This configuration is included in the plugins section of your pom.xml
file.
The data for the job is fed from a Kafka topic named clickstream
. Each entry in this topic consists of a JSON record that looks like this:
I created this data using the Confluent Cloud Datagen Connector, selecting JSON as the output format, and clickstream as the data type.
To use this in my code, I need to deserialize the data. However, there is a lot of data that I don't need. Only the request
field and the status
field are needed to compute the successes and failures for each type of request. This allows me to simplify the data structure I use, which helps avoid data coupling.
I built the class as a Java POJO (Plain Old Java Object). Flink has both internal and external serializers. The external serializer will be JSON. Internally, I will be using the Flink POJO serializer which is one of the easiest and most efficient to use. Using POJOs for your objects is good practice in Flink.
Flink defines a POJO as having either public fields or public getters and setters for all private fields, as well as a default constructor. You can read more about the Flink POJO serializer here.
With these things in mind, this is what the ClickStreamRecord
class looks like (the @JsonIgnoreProperties
annotation indicates that some of the fields in the JSON are ignored).
Note: If you are like me, you might have cringed at the fact that you have to create a mutable data structure. I would prefer it to be immutable (and my first version was), but unfortunately, immutable objects fall back to the much slower Kryo serializer. You are better off sticking with a standard POJO structure unless performance isn’t a key concern. Or you could plug in a different serializer if you prefer.
For more details on Flink Serializers, check out the corresponding video in the course.
The goal of this application is to produce something like the following:
This record counts the total records, successes, and failures for each request
. Because it is an infinite stream of data, I won't be able to do a final count. Instead, I will calculate these numbers once per minute.
As with the ClickStreamRecord
I need a corresponding POJO:
With the data structures in place, it's time to start building the application.
Every Flink job starts with an entry point. This is a standard Java main method so it's simple to create.
In addition, I need to tell Flink where to find the entry point. I can do this by specifying the mainClass
in the Maven Shade Plugin:
This instructs Flink to look for a class named ClickStreamJob
inside the clickstream
package which is where I have defined my code. Once Flink finds the class it will execute its main method. Alternatively, I can specify the mainClass
using the -c
argument when I execute the job.
For more details on creating a Flink Job and its lifecycle, check out the course module video.
Every Flink job requires an ExecutionEnvironment
. Think of this as the engine that drives the data stream. Since I am using the DataStream API I will create a StreamExecutionEnvironment
inside the main method.
Because my job is consuming from Kafka, I need some Kafka configuration properties. I'll embed those in a file named consumer.properties
. I can then load that file into a properties object like this:
I'll eventually be producing records for Kafka as well. I can use the same code to load the producer.properties
into a separate object.
Note: In reality, these two configuration files are identical. I could have just done this once. However, it's a good practice to keep your producer and consumer properties separate because they may not always be the same.
Now that I have a configuration available for Kafka, I can use it to create a KafkaSource
. This is the Kafka consumer. It will pull the records from the Kafka topic and push them downstream for further processing.
Here, I am leveraging the consumer config that was created earlier, as well as specifying that it consumes from the clickstream
topic.
I also set the StartingOffsets
to earliest
. This is to ensure that the application will collect any records that were created before it started up. Of course, you could go the other route and set it to latest
. This would mean that it only processes records that are created after the application starts up. For the purpose of this example, the decision is rather arbitrary. I chose earliest
so that the application would have data to process as soon as it started up. That made it easier for me to test.
The final thing I included is a JsonDeserializationSchema
. This tells Flink that each record coming from Kafka will be in a JSON format and that those records can be deserialized to a ClickStreamRecord
format.
For more details on creating a data source in Flink, watch the video on data sources.
With my source in place, I now need to convert it into a DataStream
. Before jumping into that, it's important to define a watermark strategy. Without a valid watermark strategy, Flink won't be able to create the windows that I need in my stream and I won't get any data. For this example, I will assume that my timestamps in Kafka are monotonically increasing and use the forMonotonousTimestamps
strategy.
I will also define a TimestampAssigner
. This is technically unnecessary for records coming from Kafka. Kafka records include a timestamp by default. However, for the purpose of this example, I want to demonstrate how it can be done, so I will include it and default to the timestamp from Kafka. In a real application, you could use the timestamp assigner to extract a different timestamp from the record or generate a new timestamp based on other logic.
Watermarks are a difficult topic. I highly recommend reviewing the video on watermarks for a more detailed explanation.
Now, using the source
and the watermarkStrategy
, I can create the DataStream
. I also include a name for the source clickstream-source
which will make it easier to identify in logs and the Flink dashboard.
With the DataStream
in place, I could theoretically start consuming records, except I'd have nowhere to put them. I could log them, but that wouldn't be all that useful. Instead, I'd like to send them to another Kafka topic where other systems can use them.
To do that, I need to start by creating a KafkaRecordSerializationSchema
. This will instruct Flink on how to convert the POJO ClickStreamAnalytics
object into a JSON record that can be sent to Kafka.
In this code, I set the name of the topic to clickstream-analytics
. I also passed a new JsonSerializationSchema
which automatically converts the POJO to the corresponding JSON record.
Now that I have a serializer, I can create a KafkaSink
. The sink is the endpoint for the data. Think of it as a Kafka producer. It will produce Kafka records in a JSON format and send them to a Kafka broker.
I am using a producerConfig
which would be created in the same way I created the consumerConfig
earlier. I also provide the serializer. Finally, I provide a DeliveryGuarantee
. Flink supports both AT_LEAST_ONCE
and EXACTLY_ONCE
guarantees. However, to use EXACTLY_ONCE
, I would need to configure checkpointing. EXACTLY_ONCE
is also unnecessary for this use case. The analytics I am producing don't need to be exact. So for now, I am going to stick with the AT_LEAST_ONCE
guarantee for simplicity.
You can find a more detailed explanation of how to create a KafkaSink
in the corresponding video on sinks.
I've created both a Source and a Sink for the data. Now I just need to process it.
I am going to create a new class named ClickStreamFunction
. It will extend the ProcessWindowFunction
. Its job is to process ClickStreamRecords
that have been grouped by the request
and aggregated into windows that are one minute long.
For each window, it will calculate the total number of records, as well as how many succeeded and how many failed. It will send that aggregated record downstream.
Note: This code could be cleaned up a little by creating a second constructor for the ClickStreamAnalytics
, but I opted to keep that class as simple as possible.
To better understand how this works, check out the video on windowing, as well as the video on transforming data.
Now that I have defined my function, it's time to put everything together.
I am using the keyBy
operation to group my records according to the request
. Flink does a shuffle so that each of these groups can be processed as a different task (potentially on a different machine). After keyBy
is complete, I use a TumblingEventTimeWindow
of one minute to aggregate the records. I then execute my ClickStreamFunction
on the result which will produce my ClickStreamAnalytics
. Those analytics are sent to the sink
to complete the stream.
The only thing left to do is to tell Flink to execute the job. That is done using the StreamExecutionEnvironment
that was created in the beginning. Calling the execute
function and passing it a job name will do the trick.
With everything in place, I executed my job using Flink and inspected the results where I saw that my analytics were being produced as I expected.
However, I've only scratched the surface here. I haven't looked at managing state, or creating branching flows, nor have I explored any individual topic in depth. If you want more details on these and other topics, you can find them in the Building Flink Applications in Java course. This course will allow you to further practice your skills by building a Flink application from the ground up. And don’t forget to check out the GitHub repository for this post if you want to try the code yourself.
Consume Apache Kafka Messages using Apache Flink and Java: Watch a quick video from Wade Waldron where he walks you through a complete example of how to consume Kafka messages using Flink and Java.
Produce Apache Kafka Messages using Apache Flink and Java: In about ten minutes, walk through a complete example of how to produce Kafka messages using Flink and Java with Wade Waldron's video.
Dive into Flink SQL, a powerful data processing engine that allows you to process and analyze large volumes of data in real time. We’ll cover how Flink SQL relates to the other Flink APIs and showcase some of its built-in functions and operations with syntax examples.
Learn why stream processing is such a critical component of the data streaming stack, why developers are choosing Apache Flink as their stream processing framework of choice, and how to use Flink with Kafka.