Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Your Guide to the Apache Flink® Table API: An In-Depth Exploration

Written By

Apache Flink® offers a variety of APIs that provide users with significant flexibility in processing data streams. Among these, the Table API stands out as one of the most popular options. Its user-friendly design allows developers to express complex data processing logic in a clear and declarative manner, making it particularly appealing for those who want to efficiently manipulate data without getting bogged down in intricate implementation details.

At this year’s Current, we introduced support for the Flink Table API in Confluent Cloud for Apache Flink® to enable customers to use Java and Python for their stream processing workloads. The Flink Table API is also supported in Confluent Platform for Apache Flink®, which launched in limited availability and supports all Flink APIs out of the box.

This introduction highlights its capabilities, how it integrates with other Flink APIs, and provides practical examples to help you get started. Whether you are working with real-time data streams or static datasets, the Table API simplifies your workflow while maintaining high performance and flexibility. If you want to go deeper into the details of how Table API works, we encourage you to check out our Table API developer course.

What is the Table API?

The Apache Flink Table API is a unified relational API for both stream and batch processing. It offers a higher-level abstraction compared to the DataStream API, enabling developers to express complex processing logic in a declarative manner using a fluent API in Java or Python.

The Table API represents data in the form of tables, similar to those in relational databases. These tables can be created from various sources, such as Apache Kafka® topics, files, or other Flink DataStreams. The API allows you to perform operations on these tables, including filtering, joining, and aggregating data.

How the Table API relates to other Flink APIs

The Table API is one of several APIs available in the Apache Flink framework, each offering different levels of abstraction and capabilities:

  1. At the lowest level is the ProcessFunction, which is part of the DataStream API. These are primitive building blocks capable of implementing almost any operation by directly manipulating Flink's state backends and timer services. At this level, you write code that reacts to each event as it arrives, one at a time.

  2. The DataStream API, while including ProcessFunction, generally operates at a slightly higher level of abstraction. It provides building blocks like streams and windows, offering more structured ways to process data.

  3. The Table API sits at an even higher level of abstraction. In terms of the abstractions involved, it is roughly equivalent to Flink SQL; however, instead of writing SQL queries, you express your data processing logic using Java or Python code.

  4. At the highest level is Flink SQL, which provides a fully declarative interface for data processing using SQL syntax.

Our fully managed service supports Flink SQL and the Table API, while our on-premises offering supports all Flink APIs.

It's important to note that these APIs are interoperable. You're not limited to choosing just one; a single Flink application can utilize multiple APIs together, leveraging their respective strengths.

The Table API is closely integrated with Flink SQL, allowing you to seamlessly mix Table API and SQL within the same program. This flexibility enables you to use SQL for simpler operations while switching to the Table API when you need more programmatic control.

Moreover, the Table API can interoperate with the DataStream API. You can convert between Tables and DataStreams, which lets you combine the high-level operations of the Table API with the lower-level control of the DataStream API when needed.

In terms of implementation, it's worth noting that the Table API and Flink SQL are two sides of the same coin. They share the same underlying infrastructure, including the optimizer and planner that determine the operator topology. Importantly, the Table/SQL API is not built on top of the DataStream API; instead, both the DataStream API and the Table/SQL API are built on the same internal, low-level stream operator API.

This architecture allows Flink to provide a seamless experience across different levels of abstraction while maintaining high performance and flexibility.

Getting started with the Table API

The easiest way to get started with the Table API on Confluent Cloud is to use either the Java examples or the Python examples available on GitHub. When using Flink on Confluent Cloud, you can begin writing your business logic immediately, as all Confluent Cloud metadata is automatically available and ready to use. Table API on Confluent Cloud works with a client-side library that delegates Table API calls to Confluent’s REST API. You don’t need to create JARs or other artifacts with Confluent Cloud.

The Table API is an excellent choice for serverless deployments on Confluent Cloud. It provides a cloud-native experience, allowing you to focus solely on your business logic while Confluent Cloud manages the underlying infrastructure. With automatic scaling and continuous updates, the Table API ensures that your Flink applications run efficiently and securely, without the need to manage clusters or worry about version compatibility.

To start using the Table API, you first need to create a TableEnvironment. This serves as the entry point for creating tables and executing queries. Here’s a simple example in Java:

import org.apache.flink.table.api.*;

import static org.apache.flink.table.api.Expressions.$;

public class TableApiExample {

    public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment env = TableEnvironment.create(settings);

        // Create a source table using datagen connector
        Table transactions =
                env.from(
                        TableDescriptor.forConnector("datagen")
                                .schema(
                                        Schema.newBuilder()
                                                .column("id", DataTypes.INT())
                                                .column("amount", DataTypes.DOUBLE())
                                                .column("ts", DataTypes.TIMESTAMP(3))
                                                .watermark("ts", "ts - INTERVAL '5' SECOND")
                                                .build())
                                .option("rows-per-second", "5")
                                .option("fields.id.kind", "sequence")
                                .option("fields.id.start", "1")
                                .option("fields.id.end", "1000")
                                .option("fields.amount.min", "1.0")
                                .option("fields.amount.max", "2000.0")
                                .build());

        // Perform operations
        Table result =
                transactions
                        // Filter for transactions with amount greater than 1000
                        .filter($("amount").isGreater(1000))
                        // Group the filtered transactions by id
                        .groupBy($("id"))
                        // For each group, select the id and sum of amounts
                        .select($("id"), $("amount").sum().as("total_amount"));

        // Print the results
        result.execute().print();
    }
}

This example demonstrates how to create a source table, perform operations using the Table API, and print the results to screen.

Key features and operations in the Table API

The Table API provides a rich set of features and operations that enable you to express complex data processing logic. While many of these features are also available in Flink SQL, the Table API presents them in a programmatic form, allowing for greater flexibility and integration with Java or Python code. Let's explore some of the key features and operations you can perform using the Table API.

Programmatic API – Java and Python

One of the key advantages of the Table API is its programmatic nature. While Flink SQL offers a declarative approach using ANSI SQL, the Table API enables you to express complex data processing logic through a fluent API in Java or Python. This provides greater flexibility and control over your data processing pipeline.

Here’s the previous example rewritten in Python:

from pyflink.table import (
    EnvironmentSettings, TableEnvironment, DataTypes, TableDescriptor, Schema
)
from pyflink.table.expressions import col

# Create a TableEnvironment in streaming mode
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)

# Create a source table using datagen connector
source = TableDescriptor.for_connector("datagen") \
    .schema(
        Schema.new_builder()
        .column("id", DataTypes.INT())
        .column("amount", DataTypes.DOUBLE())
        .column("ts", DataTypes.TIMESTAMP(3))
        .build()
    ) \
    .option("rows-per-second", "5") \
    .option("fields.id.kind", "sequence") \
    .option("fields.id.start", "1") \
    .option("fields.id.end", "1000") \
    .option("fields.amount.min", "1.0") \
    .option("fields.amount.max", "2000.0") \
    .build()

transactions = t_env.from_descriptor(source)

# Perform operations
result = transactions \
    .filter(col("amount") > 1000) \
    .group_by(col("id")) \
    .select(
        col("id"),
        col("amount").sum.alias("total_amount")
    )

# Execute and print the results
result.execute().print()

Unit testing Table API applications

Unit testing is crucial for ensuring the correctness of your Flink Table API applications. Here’s an example of how to write a unit test for the previous Table API example. In this unit test, we extracted the transaction processing logic into a separate TransactionProcessor class. This class can then be used in both the main TableApiExample and the TableApiExampleTest, ensuring that the same code is being tested and used in production.

import org.apache.flink.table.api.*;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.table.api.Expressions.*;
import static org.junit.jupiter.api.Assertions.*;

public class TableApiExampleTest {

    @Test
    public void testTransactionAggregation() throws Exception {
        // Create a TableEnvironment for unit testing
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment env = TableEnvironment.create(settings);

        // Create a source table with test data
        DataType schema =
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.INT()),
                        DataTypes.FIELD("amount", DataTypes.DOUBLE()),
                        DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)));

        Table transactions =
                env.fromValues(
                        schema,
                        Row.of(1, 500.0, "2023-01-01 10:00:00"),
                        Row.of(1, 1500.0, "2023-01-01 11:00:00"),
                        Row.of(2, 2000.0, "2023-01-01 12:00:00"),
                        Row.of(2, 1000.0, "2023-01-01 13:00:00"));

        // Perform operations (extracted into separate class, so that the same code is being tested
        // and used in production
        Table result = TransactionProcessor.processTransactions(transactions);

        // Convert the result to a List for verification
        try (CloseableIterator<Row> iterator = result.execute().collect()) {
            List<Row> results = new ArrayList<>();
            iterator.forEachRemaining(results::add);

            // Verify the results
            assertEquals(2, results.size());
            assertTrue(results.contains(Row.of(1, 1500.0)));
            assertTrue(results.contains(Row.of(2, 2000.0)));
        }
    }
}
public class TransactionProcessor {
    public static Table processTransactions(Table transactions) {
        return transactions
                .filter($("amount").isGreater(1000))
                .groupBy($("id"))
                .select($("id"), $("amount").sum().as("total_amount"));
    }
}

Table API operations

The Table API provides a rich set of operations that can be performed on tables, allowing you to transform and analyze your data in various ways. These operations use expressions to manipulate columns and rows. Here are some examples of common operations:

Basic column selection and renaming

import static org.apache.flink.table.api.Expressions.*;

Table result = transactions.select(
    $("id").as("transaction_id"),
    $("amount").as("transaction_amount"),
    $("ts").as("transaction_time")
);

Mathematical and string operations

import static org.apache.flink.table.api.Expressions.*;

Table result = transactions
    .select(
        $("id"),
        $("amount").multiply(2).as("doubled_amount"),
        $("product").upperCase().as("PRODUCT")
    );

Conditional expressions

Table result = transactions.select(
    $("id"),
    $("amount"),
    $("amount").isGreater(1000).then("high_value", "normal").as("transaction_type")
);

Adding a new column based on existing ones

Table result = transactions
    .addColumns(($("amount").multiply(2)).as("doubled_amount"));

Combining multiple operations

Table result = transactions.select(
    $("id"),
    $("amount"),
    $("ts"),
    $("amount")
        .isGreater(1000)
        .then("high_value", "normal")
        .as("trans
action_type")
);

Set operations

The Table API supports set operations such as union, intersection, and except:

Table result = table1
    .union(table2)
    .except(table3)
    .intersect(table4);

Joins and aggregations

Table orders = env.from("orders");
Table products = env.from("products");

Table result = orders
    .join(products, $("orders.product_id").isEqual($("products.id")))
    .groupBy($("products.category"), $("orders.customer_id"))
    .select($("category"), $("customer_id"), $("orders.amount").sum().as("total_amount"));

Windows

The Table API offers powerful windowing capabilities for both batch and stream processing:

.window(Tumble.over(lit(10).seconds()).on($("ts")).as("window_10s"))
                .groupBy($("id"), $("window_10s"))
                .select(
                        $("id"),
                        $("window_10s").start().as("window_start"),
                        $("amount").sum().as("total_amount"));

This example creates a tumbling window of 10 seconds based on the ts column and calculates the sum of the amount for each id within each window.

User-defined functions (UDFs)

The Table API allows you to extend its functionality by defining custom logic using user-defined functions (UDFs), which are available for early access in Java for our fully managed service. Several types of UDFs are available:

  1. Scalar functions: 

    • These functions take scalar values as input and return a single scalar value. 

    • Example: A function that doubles a value from the amount column.

  2. Table functions (UDTFs):

    • Table functions transform scalar values into multiple rows as output. They are useful for splitting a single value into multiple rows. 

    • Example: A function that splits a string into multiple rows, one for each word.

  3. Aggregate functions (UDAGGs): 

    • Aggregate functions process multiple input rows to produce a single scalar value. This is useful for operations like summing, averaging, or computing custom metrics across multiple rows. 

    • Example: A function that calculates the cumulative sum of a column for each group of rows.

  4. Table aggregate functions (UDTAGGs): 

    • Table aggregate functions are similar to aggregate functions, but instead of returning a single scalar value, they return multiple rows based on the aggregation. 

    • Example: A function that computes a running top-3 list of values for each group.

  5. Async table functions:

    • These are specialized functions designed for asynchronous operations, such as performing lookups from an external system in a non-blocking manner. 

    • Example: A function that performs an asynchronous lookup for user details from an external database.

Now, let’s look at an example of a user-defined aggregate function (UDAGG) that tracks the total sum of all previous orders by the same ID. This function is useful for computing running totals over time for each unique ID.

UDAGG example:

import org.apache.flink.table.functions.AggregateFunction;

// The accumulator keeps track of the current total sum for each id
public class RunningTotal extends AggregateFunction<Double, RunningTotal.Accumulator> {

    // Inner class to hold the accumulator state
    public static class Accumulator {
        public double sum = 0.0;
    }

    @Override
    public Accumulator createAccumulator() {
        return new Accumulator();
    }

    @Override
    public Double getValue(Accumulator accumulator) {
        return accumulator.sum;
    }

    // This method adds a new amount to the current total sum
    public void accumulate(Accumulator accumulator, Double amount) {
        if (amount != null) {
            accumulator.sum += amount;
        }
    }
}

Invocation in Table API:

// Register the UDAGG
RunningTotal runningTotal = new RunningTotal();
env.createTemporarySystemFunction("RunningTotal", runningTotal);

Table result =
        transactions
                // Group the transactions by id
                .groupBy($("id"))
                // Apply the running total UDAGG
                .select($("id"), call("RunningTotal", $("amount")).as("total_amount"));

Streaming vs. batch in the Table API

Like Flink SQL, the Table API supports both streaming and batch processing modes. In our fully managed Flink service, batch processing mode will be supported in the coming months. The API itself remains the same whether you're working with bounded (finite) or unbounded (infinite) data. This unified API allows you to write your data processing logic once and apply it to both batch and streaming scenarios.

In streaming mode, the Table API processes continuous streams of data in real time. It supports event time processing and sophisticated windowing operations, making it ideal for scenarios like real-time analytics or continuous ETL.

In batch mode, the Table API processes static datasets. This is suitable for scenarios where you have a finite set of data to process, such as daily or weekly report generation.

The main differences between streaming and batch mode in the Table API are:

  1. Input processing: In streaming queries, the input is processed in real time as it arrives. The entire pipeline runs continuously to handle new data. In contrast, batch queries process finite input data in stages, running only as needed.

  2. Result production: Streaming queries produce continuous results as new data arrives, while batch queries yield a single final result.

  3. Time attributes: In streaming mode, you often work with event time, whereas in batch mode, time attributes are treated as regular columns.

  4. Optimizations: The Flink optimizer may apply different optimizations depending on whether you are in streaming or batch mode.

It's important to note that bounded tables can be processed in both streaming and batch modes, while unbounded tables can only be processed in streaming mode.

To switch between streaming and batch modes, simply change the environment settings when creating your TableEnvironment:

// For streaming mode
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();

// For batch mode
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();

TableEnvironment env = TableEnvironment.create(settings);

By offering a unified API for both streaming and batch processing, Flink's Table API provides a flexible and powerful tool for a wide range of data processing scenarios.

Ready to put the Flink Table API into practice?

The Apache Flink Table API is a powerful tool for both stream and batch processing, offering a unified relational interface that simplifies complex data operations. With its higher-level abstraction compared to the DataStream API, developers can express processing logic clearly using Java or Python, allowing for greater flexibility and control across various data tasks.

By integrating seamlessly with Flink SQL and the DataStream API, the Table API supports a multifaceted approach to data management, enabling users to leverage the strengths of each API as needed. It provides essential features such as windowing capabilities, set operations, and user-defined functions, making it suitable for handling both continuous streams of data and static datasets efficiently.

To explore the Table API further, try our Table API developer course. Additionally, for a hands-on demo, check out our upcoming Q3 2024 Confluent Launch webinar, where you'll learn about the latest Flink enhancements, including Table API support, and other exciting new features for Confluent Cloud. Overall, the Flink Table API equips developers with the tools to build scalable, flexible, and efficient applications, enhancing your ability to manage data workflows effectively.

  • Martijn Visser is Group Product Manager at Confluent and PMC member and committer for the Apache Flink project. He works with the open source community on user-facing features such as the Table/SQL and DataStream API, connectors, and formats. Prior to joining Confluent, he worked as product manager at Immerok, which was acquired by Confluent.

    He has also worked as product manager at Ververica, where he was responsible for product development on Apache Flink and Ververica Platform, and as product lead at ING, where he was responsible for ING's Streaming Data and Engagement Platform.

Did you like this blog post? Share it now