Unlock the Secrets of Shifting Left in Our Upcoming Webinar | Register Now

Dataflow Programming with Apache Flink and Apache Kafka

Written By

Recently, I got my hands dirty working with Apache Flink®. The experience was a little overwhelming. I have spent years working with streaming technologies but Flink was new to me and the resources online were rarely what I needed. Thankfully, I had access to some of the best Flink experts in the business to provide me with first-class advice, but not everyone has access to an expert when they need one. 

To share what I learned, I created the Building Flink Applications in Java course on Confluent Developer. It provides you with hands-on experience in building a Flink application from the ground up. I also wrote this blog post to walk through an example of how to do dataflow programming with Flink. I hope these two resources will make the experience less overwhelming for others.

One of the challenges I had while learning Flink was that many of the learning materials focused on small, unrealistic applications such as word counters and basic string processors. I would prefer a more in-depth and realistic application to help me understand the bigger picture.

To that end, I built a small Java pipeline that consumes clickstream data from Apache Kafka®. Consuming clickstreams is something that many businesses might legitimately have a use for. It can also be generalized to consuming other types of streaming data. The pipeline uses a keyBy operation to group each clickstream record according to the request. It then separates those records into one-minute windows. Finally, it computes the number of successes and failures during each window.

The full code for this blog post can be found in the GitHub repository

In this blog post you will learn how to:

  • Set up a Maven project

  • Create a connection to Kafka

  • Serialize and deserialize messages

  • Consume messages from a topic

  • Group messages by key and time window

  • Produce results to a separate Kafka topic

Creating the Flink project

I chose to build my project using Apache Maven. I won't go into the full Maven file, but I will highlight some key elements.

Flink Maven dependencies

There are three dependencies I need in my project. These include:

  • flink-streaming-java: Provides the Java libraries for the Datastream API

  • flink-json: Allows Flink to serialize and deserialize JSON records

  • flink-connector-kafka: Used to produce and consume data from Kafka topics

You can see the Maven dependencies below:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>${flink.version}</version>
</dependency>

Building a Flink uber JAR

When the code is sent to Flink, it must be packaged into a single uber JAR (aka fat JAR). This is done using the maven-shade-plugin. This configuration is included in the plugins section of your pom.xml file.

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>3.0.0</version>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <artifactSet>
                    <excludes>

<exclude>org.apache.flink:flink-shaded-force-shading</exclude>

<exclude>com.google.code.findbugs:jsr305</exclude>
                        <exclude>org.slf4j:*</exclude>

<exclude>org.apache.logging.log4j:*</exclude>
                    </excludes>
                </artifactSet>
                <filters>
                    <filter>
                        <!-- Do not copy the signatures in the META-INF folder.
                        Otherwise, this might cause SecurityExceptions when using the JAR. -->
                        <artifact>*:*</artifact>
                        <excludes>
                            <exclude>META-INF/*.SF</exclude>
                            <exclude>META-INF/*.DSA</exclude>
                            <exclude>META-INF/*.RSA</exclude>
                        </excludes>
                    </filter>
                </filters>
                <transformers>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">

<mainClass>clickstream.ClickStreamJob</mainClass>
                    </transformer>
                </transformers>
            </configuration>
        </execution>
    </executions>
</plugin>

The clickstream record

The data for the job is fed from a Kafka topic named clickstream. Each entry in this topic consists of a JSON record that looks like this:

{
  "ip": "111.245.174.248",
  "userid": 24,
  "remote_user": "-",
  "time": "991",
  "_time": 991,
  "request": "GET /images/logo-small.png HTTP/1.1",
  "status": "404",
  "bytes": "4096",
  "referrer": "-",
  "agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36"
}

I created this data using the Confluent Cloud Datagen Connector, selecting JSON as the output format, and clickstream as the data type.

To use this in my code, I need to deserialize the data. However, there is a lot of data that I don't need. Only the request field and the status field are needed to compute the successes and failures for each type of request. This allows me to simplify the data structure I use, which helps avoid data coupling. 

I built the class as a Java POJO (Plain Old Java Object). Flink has both internal and external serializers. The external serializer will be JSON. Internally, I will be using the Flink POJO serializer which is one of the easiest and most efficient to use. Using POJOs for your objects is good practice in Flink.

Flink defines a POJO as having either public fields or public getters and setters for all private fields, as well as a default constructor. You can read more about the Flink POJO serializer here.

With these things in mind, this is what the ClickStreamRecord class looks like (the @JsonIgnoreProperties annotation indicates that some of the fields in the JSON are ignored).

@JsonIgnoreProperties(ignoreUnknown = true)
public class ClickStreamRecord {
    private String request;
    private int status;

    public ClickStreamRecord() {
        status = 0;
        request = "";
    }

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }
}

Note: If you are like me, you might have cringed at the fact that you have to create a mutable data structure. I would prefer it to be immutable (and my first version was), but unfortunately, immutable objects fall back to the much slower Kryo serializer. You are better off sticking with a standard POJO structure unless performance isn’t a key concern. Or you could plug in a different serializer if you prefer.

For more details on Flink Serializers, check out the corresponding video in the course.

The clickstream analytics

The goal of this application is to produce something like the following:

{
  "request": "GET /images/track.png HTTP/1.1",
  "totalCount": 20,
  "successes": 5,
  "failures": 15,
  "timestamp": 1689872409189
}

This record counts the total records, successes, and failures for each request. Because it is an infinite stream of data, I won't be able to do a final count. Instead, I will calculate these numbers once per minute.

As with the ClickStreamRecord I need a corresponding POJO:

public class ClickStreamAnalytics {
    private String request;
    private long totalCount;
    private long successes;
    private long failures;
    private long timestamp;

    public ClickStreamAnalytics() {
        request = "";
        totalCount = 0;
        successes = 0;
        failures = 0;
        timestamp = 0;
    }

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }

    public long getTotalCount() {
        return totalCount;
    }

    public void setTotalCount(long totalCount) {
        this.totalCount = totalCount;
    }

    public long getSuccesses() {
        return successes;
    }

    public void setSuccesses(long successes) {
        this.successes = successes;
    }

    public long getFailures() {
        return failures;
    }

    public void setFailures(long failures) {
        this.failures = failures;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }
}

Creating a Flink entry point

With the data structures in place, it's time to start building the application.

Every Flink job starts with an entry point. This is a standard Java main method so it's simple to create.

public class ClickStreamJob {
    public static void main(String[] args) throws Exception {

    }
}

In addition, I need to tell Flink where to find the entry point. I can do this by specifying the mainClass in the Maven Shade Plugin:

<mainClass>clickstream.ClickStreamJob</mainClass>

This instructs Flink to look for a class named ClickStreamJob inside the clickstream package which is where I have defined my code. Once Flink finds the class it will execute its main method. Alternatively, I can specify the mainClass using the -c argument when I execute the job.

flink run -c clickstream.ClickStreamJob target/clickstream-0.1.jar

For more details on creating a Flink Job and its lifecycle, check out the course module video.

The stream execution environment

Every Flink job requires an ExecutionEnvironment. Think of this as the engine that drives the data stream. Since I am using the DataStream API I will create a StreamExecutionEnvironment inside the main method.

StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();

Loading the configuration

Because my job is consuming from Kafka, I need some Kafka configuration properties. I'll embed those in a file named consumer.properties. I can then load that file into a properties object like this:

Properties consumerConfig = new Properties();
try (InputStream stream = ClickStreamJob.class
    .getClassLoader()
    .getResourceAsStream("consumer.properties")
) {
    consumerConfig.load(stream);
}

I'll eventually be producing records for Kafka as well. I can use the same code to load the producer.properties into a separate object.

Note: In reality, these two configuration files are identical. I could have just done this once. However, it's a good practice to keep your producer and consumer properties separate because they may not always be the same.

Creating a Kafka source

Now that I have a configuration available for Kafka, I can use it to create a KafkaSource. This is the Kafka consumer. It will pull the records from the Kafka topic and push them downstream for further processing.

KafkaSource<ClickStreamRecord> source =
    KafkaSource.<ClickStreamRecord>builder()
        .setProperties(consumerConfig)
        .setTopics("clickstream")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(
            new JsonDeserializationSchema<>(ClickStreamRecord.class)
        )
        .build();

Here, I am leveraging the consumer config that was created earlier, as well as specifying that it consumes from the clickstream topic.

I also set the StartingOffsets to earliest. This is to ensure that the application will collect any records that were created before it started up. Of course, you could go the other route and set it to latest. This would mean that it only processes records that are created after the application starts up. For the purpose of this example, the decision is rather arbitrary. I chose earliest so that the application would have data to process as soon as it started up. That made it easier for me to test.

The final thing I included is a JsonDeserializationSchema. This tells Flink that each record coming from Kafka will be in a JSON format and that those records can be deserialized to a ClickStreamRecord format.

For more details on creating a data source in Flink, watch the video on data sources.

Creating a watermark strategy

With my source in place, I now need to convert it into a DataStream. Before jumping into that, it's important to define a watermark strategy. Without a valid watermark strategy, Flink won't be able to create the windows that I need in my stream and I won't get any data. For this example, I will assume that my timestamps in Kafka are monotonically increasing and use the forMonotonousTimestamps strategy. 

I will also define a TimestampAssigner. This is technically unnecessary for records coming from Kafka. Kafka records include a timestamp by default. However, for the purpose of this example, I want to demonstrate how it can be done, so I will include it and default to the timestamp from Kafka. In a real application, you could use the timestamp assigner to extract a different timestamp from the record or generate a new timestamp based on other logic.

WatermarkStrategy<ClickStreamRecord> watermarkStrategy =
    WatermarkStrategy
        .<ClickStreamRecord>forMonotonousTimestamps()
        .withTimestampAssigner(
            (record, timestamp) -> timestamp
        );

Watermarks are a difficult topic. I highly recommend reviewing the video on watermarks for a more detailed explanation.

Creating a data stream from a Kafka source

Now, using the source and the watermarkStrategy, I can create the DataStream. I also include a name for the source clickstream-source which will make it easier to identify in logs and the Flink dashboard.

DataStream<ClickStreamRecord> stream = env
    .fromSource(
        source,
        watermarkStrategy,
        "clickstream-source"
    );

Serializing records to a Kafka topic

With the DataStream in place, I could theoretically start consuming records, except I'd have nowhere to put them. I could log them, but that wouldn't be all that useful. Instead, I'd like to send them to another Kafka topic where other systems can use them.

To do that, I need to start by creating a KafkaRecordSerializationSchema. This will instruct Flink on how to convert the POJO ClickStreamAnalytics object into a JSON record that can be sent to Kafka.

KafkaRecordSerializationSchema<ClickStreamAnalytics> serializer =
    KafkaRecordSerializationSchema
        .<ClickStreamAnalytics>builder()
        .setTopic("clickstream-analytics")
        .setValueSerializationSchema(
            new JsonSerializationSchema<>()
        )
        .build();

In this code, I set the name of the topic to clickstream-analytics. I also passed a new JsonSerializationSchema which automatically converts the POJO to the corresponding JSON record.

Creating a Kafka sink

Now that I have a serializer, I can create a KafkaSink. The sink is the endpoint for the data. Think of it as a Kafka producer. It will produce Kafka records in a JSON format and send them to a Kafka broker.

KafkaSink<ClickStreamAnalytics> sink =
    KafkaSink.<ClickStreamAnalytics>builder()
        .setKafkaProducerConfig(producerConfig)
        .setRecordSerializer(serializer)
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();

I am using a producerConfig which would be created in the same way I created the consumerConfig earlier. I also provide the serializer. Finally, I provide a DeliveryGuarantee. Flink supports both AT_LEAST_ONCE and EXACTLY_ONCE guarantees. However, to use EXACTLY_ONCE, I would need to configure checkpointing. EXACTLY_ONCE is also unnecessary for this use case. The analytics I am producing don't need to be exact. So for now, I am going to stick with the AT_LEAST_ONCE guarantee for simplicity.

You can find a more detailed explanation of how to create a KafkaSink in the corresponding video on sinks.

Processing the clickstream

I've created both a Source and a Sink for the data. Now I just need to process it.

I am going to create a new class named ClickStreamFunction. It will extend the ProcessWindowFunction. Its job is to process ClickStreamRecords that have been grouped by the request and aggregated into windows that are one minute long.

For each window, it will calculate the total number of records, as well as how many succeeded and how many failed. It will send that aggregated record downstream.

public class ClickStreamFunction 
    extends ProcessWindowFunction<ClickStreamRecord, ClickStreamAnalytics, String, TimeWindow> {
    @Override
    public void process(
        String request,
        ProcessWindowFunction<ClickStreamRecord, ClickStreamAnalytics, String, TimeWindow>.Context context,
        Iterable<ClickStreamRecord> iterable,
        Collector<ClickStreamAnalytics> collector
    ) throws Exception {
        long total = 0;
        long successes = 0;
        long failures = 0;

        for(ClickStreamRecord record : iterable) {
            if(record.getStatus() >= 400)
                failures++;
            else
                successes++;

            total++;
        }

        ClickStreamAnalytics analytics = new ClickStreamAnalytics();
        analytics.setRequest(request);
        analytics.setTotalCount(total);
        analytics.setSuccesses(successes);
        analytics.setFailures(failures);
        analytics.setTimestamp(context.currentProcessingTime());
        collector.collect(analytics);
    }
}

Note: This code could be cleaned up a little by creating a second constructor for the ClickStreamAnalytics, but I opted to keep that class as simple as possible.

To better understand how this works, check out the video on windowing, as well as the video on transforming data.

Group By Keys and Apply Windows

Now that I have defined my function, it's time to put everything together.

I am using the keyBy operation to group my records according to the request. Flink does a shuffle so that each of these groups can be processed as a different task (potentially on a different machine). After keyBy is complete, I use a TumblingEventTimeWindow of one minute to aggregate the records. I then execute my ClickStreamFunction on the result which will produce my ClickStreamAnalytics. Those analytics are sent to the sink to complete the stream.

stream
    .keyBy(ClickStreamRecord::getRequest)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .process(new ClickStreamFunction());
    .sinkTo(sink);

Execute the job

The only thing left to do is to tell Flink to execute the job. That is done using the StreamExecutionEnvironment that was created in the beginning. Calling the execute function and passing it a job name will do the trick.

env.execute("clickstream-job");

What's next?

With everything in place, I executed my job using Flink and inspected the results where I saw that my analytics were being produced as I expected.

However, I've only scratched the surface here. I haven't looked at managing state, or creating branching flows, nor have I explored any individual topic in depth. If you want more details on these and other topics, you can find them in the Building Flink Applications in Java course. This course will allow you to further practice your skills by building a Flink application from the ground up. And don’t forget to check out the GitHub repository for this post if you want to try the code yourself.

Related resources

  • Wade Waldron has been a software developer since 2005. He has worked on video games, backend microservices, ETL pipelines, IoT systems, and more. He is an advocate for test-driven development, domain-driven design, microservice architecture, and event-driven systems. Today, Wade works as a Staff Software Practice Lead at Confluent where he shows people how to build modern data streaming applications.

Did you like this blog post? Share it now