Recently, I got my hands dirty working with Apache Flink®. The experience was a little overwhelming. I have spent years working with streaming technologies but Flink was new to me and the resources online were rarely what I needed. Thankfully, I had access to some of the best Flink experts in the business to provide me with first-class advice, but not everyone has access to an expert when they need one.
To share what I learned, I created the Building Flink Applications in Java course on Confluent Developer. It provides you with hands-on experience in building a Flink application from the ground up. I also wrote this blog post to walk through an example of how to do dataflow programming with Flink. I hope these two resources will make the experience less overwhelming for others.
One of the challenges I had while learning Flink was that many of the learning materials focused on small, unrealistic applications such as word counters and basic string processors. I would prefer a more in-depth and realistic application to help me understand the bigger picture.
To that end, I built a small Java pipeline that consumes clickstream data from Apache Kafka®. Consuming clickstreams is something that many businesses might legitimately have a use for. It can also be generalized to consuming other types of streaming data. The pipeline uses a
keyBy operation to group each clickstream record according to the request. It then separates those records into one-minute windows. Finally, it computes the number of successes and failures during each window.
The full code for this blog post can be found in the GitHub repository.
In this blog post you will learn how to:
Set up a Maven project
Create a connection to Kafka
Serialize and deserialize messages
Consume messages from a topic
Group messages by key and time window
Produce results to a separate Kafka topic
I chose to build my project using Apache Maven. I won't go into the full Maven file, but I will highlight some key elements.
There are three dependencies I need in my project. These include:
flink-streaming-java: Provides the Java libraries for the Datastream API
flink-json: Allows Flink to serialize and deserialize JSON records
flink-connector-kafka: Used to produce and consume data from Kafka topics
You can see the Maven dependencies below:
When the code is sent to Flink, it must be packaged into a single uber JAR (aka fat JAR). This is done using the
maven-shade-plugin. This configuration is included in the plugins section of your
The data for the job is fed from a Kafka topic named
clickstream. Each entry in this topic consists of a JSON record that looks like this:
I created this data using the Confluent Cloud Datagen Connector, selecting JSON as the output format, and clickstream as the data type.
To use this in my code, I need to deserialize the data. However, there is a lot of data that I don't need. Only the
request field and the
status field are needed to compute the successes and failures for each type of request. This allows me to simplify the data structure I use, which helps avoid data coupling.
I built the class as a Java POJO (Plain Old Java Object). Flink has both internal and external serializers. The external serializer will be JSON. Internally, I will be using the Flink POJO serializer which is one of the easiest and most efficient to use. Using POJOs for your objects is good practice in Flink.
Flink defines a POJO as having either public fields or public getters and setters for all private fields, as well as a default constructor. You can read more about the Flink POJO serializer here.
With these things in mind, this is what the
ClickStreamRecord class looks like (the
@JsonIgnoreProperties annotation indicates that some of the fields in the JSON are ignored).
Note: If you are like me, you might have cringed at the fact that you have to create a mutable data structure. I would prefer it to be immutable (and my first version was), but unfortunately, immutable objects fall back to the much slower Kryo serializer. You are better off sticking with a standard POJO structure unless performance isn’t a key concern. Or you could plug in a different serializer if you prefer.
For more details on Flink Serializers, check out the corresponding video in the course.
The goal of this application is to produce something like the following:
This record counts the total records, successes, and failures for each
request. Because it is an infinite stream of data, I won't be able to do a final count. Instead, I will calculate these numbers once per minute.
As with the
ClickStreamRecord I need a corresponding POJO:
With the data structures in place, it's time to start building the application.
Every Flink job starts with an entry point. This is a standard Java main method so it's simple to create.
In addition, I need to tell Flink where to find the entry point. I can do this by specifying the
mainClass in the Maven Shade Plugin:
This instructs Flink to look for a class named
ClickStreamJob inside the
clickstream package which is where I have defined my code. Once Flink finds the class it will execute its main method. Alternatively, I can specify the
mainClass using the
-c argument when I execute the job.
For more details on creating a Flink Job and its lifecycle, check out the course module video.
Every Flink job requires an
ExecutionEnvironment. Think of this as the engine that drives the data stream. Since I am using the DataStream API I will create a
StreamExecutionEnvironment inside the main method.
Because my job is consuming from Kafka, I need some Kafka configuration properties. I'll embed those in a file named
consumer.properties. I can then load that file into a properties object like this:
I'll eventually be producing records for Kafka as well. I can use the same code to load the
producer.properties into a separate object.
Note: In reality, these two configuration files are identical. I could have just done this once. However, it's a good practice to keep your producer and consumer properties separate because they may not always be the same.
Now that I have a configuration available for Kafka, I can use it to create a
KafkaSource. This is the Kafka consumer. It will pull the records from the Kafka topic and push them downstream for further processing.
Here, I am leveraging the consumer config that was created earlier, as well as specifying that it consumes from the
I also set the
earliest. This is to ensure that the application will collect any records that were created before it started up. Of course, you could go the other route and set it to
latest. This would mean that it only processes records that are created after the application starts up. For the purpose of this example, the decision is rather arbitrary. I chose
earliest so that the application would have data to process as soon as it started up. That made it easier for me to test.
The final thing I included is a
JsonDeserializationSchema. This tells Flink that each record coming from Kafka will be in a JSON format and that those records can be deserialized to a
For more details on creating a data source in Flink, watch the video on data sources.
With my source in place, I now need to convert it into a
DataStream. Before jumping into that, it's important to define a watermark strategy. Without a valid watermark strategy, Flink won't be able to create the windows that I need in my stream and I won't get any data. For this example, I will assume that my timestamps in Kafka are monotonically increasing and use the
I will also define a
TimestampAssigner. This is technically unnecessary for records coming from Kafka. Kafka records include a timestamp by default. However, for the purpose of this example, I want to demonstrate how it can be done, so I will include it and default to the timestamp from Kafka. In a real application, you could use the timestamp assigner to extract a different timestamp from the record or generate a new timestamp based on other logic.
Watermarks are a difficult topic. I highly recommend reviewing the video on watermarks for a more detailed explanation.
Now, using the
source and the
watermarkStrategy, I can create the
DataStream. I also include a name for the source
clickstream-source which will make it easier to identify in logs and the Flink dashboard.
DataStream in place, I could theoretically start consuming records, except I'd have nowhere to put them. I could log them, but that wouldn't be all that useful. Instead, I'd like to send them to another Kafka topic where other systems can use them.
To do that, I need to start by creating a
KafkaRecordSerializationSchema. This will instruct Flink on how to convert the POJO
ClickStreamAnalytics object into a JSON record that can be sent to Kafka.
In this code, I set the name of the topic to
clickstream-analytics. I also passed a new
JsonSerializationSchema which automatically converts the POJO to the corresponding JSON record.
Now that I have a serializer, I can create a
KafkaSink. The sink is the endpoint for the data. Think of it as a Kafka producer. It will produce Kafka records in a JSON format and send them to a Kafka broker.
I am using a
producerConfig which would be created in the same way I created the
consumerConfig earlier. I also provide the serializer. Finally, I provide a
DeliveryGuarantee. Flink supports both
EXACTLY_ONCE guarantees. However, to use
EXACTLY_ONCE, I would need to configure checkpointing.
EXACTLY_ONCE is also unnecessary for this use case. The analytics I am producing don't need to be exact. So for now, I am going to stick with the
AT_LEAST_ONCE guarantee for simplicity.
You can find a more detailed explanation of how to create a
KafkaSink in the corresponding video on sinks.
I've created both a Source and a Sink for the data. Now I just need to process it.
I am going to create a new class named
ClickStreamFunction. It will extend the
ProcessWindowFunction. Its job is to process
ClickStreamRecords that have been grouped by the
request and aggregated into windows that are one minute long.
For each window, it will calculate the total number of records, as well as how many succeeded and how many failed. It will send that aggregated record downstream.
Note: This code could be cleaned up a little by creating a second constructor for the
ClickStreamAnalytics, but I opted to keep that class as simple as possible.
Now that I have defined my function, it's time to put everything together.
I am using the
keyBy operation to group my records according to the
request. Flink does a shuffle so that each of these groups can be processed as a different task (potentially on a different machine). After
keyBy is complete, I use a
TumblingEventTimeWindow of one minute to aggregate the records. I then execute my
ClickStreamFunction on the result which will produce my
ClickStreamAnalytics. Those analytics are sent to the
sink to complete the stream.
The only thing left to do is to tell Flink to execute the job. That is done using the
StreamExecutionEnvironment that was created in the beginning. Calling the
execute function and passing it a job name will do the trick.
With everything in place, I executed my job using Flink and inspected the results where I saw that my analytics were being produced as I expected.
However, I've only scratched the surface here. I haven't looked at managing state, or creating branching flows, nor have I explored any individual topic in depth. If you want more details on these and other topics, you can find them in the Building Flink Applications in Java course. This course will allow you to further practice your skills by building a Flink application from the ground up. And don’t forget to check out the GitHub repository for this post if you want to try the code yourself.
Consume Apache Kafka Messages using Apache Flink and Java: Watch a quick video from Wade Waldron where he walks you through a complete example of how to consume Kafka messages using Flink and Java.
Learn the basics of building Apache Flink applications in Java in our course on Confluent Developer.
Get Apache Kafka and Flink news delivered to your inbox biweekly or read the latest editions on Confluent Developer!