Hands-on Workshop: ZooKeeper to KRaft Without the Hassle | Secure Your Spot

Unlocking Data Insights with Confluent Tableflow: Querying Apache Iceberg™️ Tables with Jupyter Notebooks

Written By

What if you could analyze real-time and historical data with just a few clicks and minimal code? Whether you're a data scientist, engineer, or analyst working in Python, you don't need to be an Apache Kafka® expert to unlock the power of streaming analytics. In this blog, we'll walk you through integrating Confluent Tableflow with Trino, which will enable you to query and visualize Apache Iceberg tables effortlessly in Jupyter Notebooks. By the end, you'll see how easy it is to work with streaming data using familiar SQL queries—no complex setup required.

What Is Tableflow?

In the modern data landscape, businesses struggle to bridge the gap between real-time event streaming and analytical workloads. Confluent Tableflow provides a powerful solution by enabling the seamless integration of streaming and batch data, using Apache Iceberg tables. With Tableflow, organizations can persist and structure streaming data into Iceberg tables stored in cloud object storage, ensuring efficient, scalable, and cost-effective analytics. This allows users to perform complex queries on streaming data without requiring extensive ETL processes.

Now generally available, Tableflow is designed to unify real-time and historical analytics, enabling data teams to leverage the best of both worlds. Companies can retain high-fidelity event streams while also benefiting from the efficiency of structured analytical queries, making it easier to derive insights from their Confluent Cloud data.

Tableflow supports integration with various query engines and data catalogs, such as Trino, Apache Spark™️, DuckDB and Amazon SageMaker Lakehouse. This allows users to seamlessly connect their Apache Iceberg™️ tables with SQL-based engines and machine learning platforms, ensuring broad compatibility for different analytical needs. A great example of this is how Amazon SageMaker Lakehouse leverages Tableflow to enable machine learning and advanced analytics on streaming data. For more details, see this blog.

Expanding Data Analytics With Trino

Trino is an open-source, distributed, SQL query engine designed for interactive and large-scale data analysis. It enables users to query data from multiple sources, including object storage, relational databases, and data lakes, using a single SQL interface. Trino supports ANSI SQL and is optimized for performance, making it a preferred choice for querying Iceberg tables stored in Confluent Tableflow.

With Trino, users can execute fast SQL queries over data stored in cloud environments, removing the need for time-consuming data migrations. Its ability to handle federated queries allows for seamless integration with Confluent Tableflow, making it easier to extract meaningful insights from real-time and historical data.

Taking Tableflow Further: A Hands-On Approach

We have documentation on integrating Tableflow with Trino, which showcases how to query Apache Iceberg tables using SQL. While the docs demonstrate how to connect Trino with Tableflow, they primarily focus on SQL-based queries via the command line interface.

This blog takes the next step by providing a more hands-on, visual approach using Jupyter Notebooks. In conjunction with Python’s Pandas library, it shows you how to query Iceberg tables directly from Jupyter, enabling you to explore and visualize your data interactively. This approach simplifies the workflow for data scientists, engineers, and analysts who prefer a Python-native environment for their analytics needs.

We are about to bring data analytics to life with an intuitive, easy-code-driven approach!

Getting Started: Step-By-Step Setup

To complete these steps, you’ll need access to Confluent Cloud. If you don’t have the required permissions, ask your Confluent Cloud administrator for assistance. The credentials required are the API key and secret, Confluent Cloud organizational ID, environment ID, cluster ID, topic names, and region. Once you have these, you’ll have everything needed to set up Trino and run your SQL queries.

Confluent Cloud Account

If you’re new to Confluent Cloud, the first step is to sign into your account. If you don’t have an account yet, you can sign up for a free trial here. New users receive $400 in credits to use within the first 30 days.

Once logged in, follow these steps:

  1. Create a Kafka cluster: Set up a Kafka cluster within your Confluent Cloud account.

  2. Create a Kafka topic: Create a topic to which you’ll send and receive messages.

  3. Set up a sample producer: To start sending messages to your topics, set up two sample Kafka producers using the Datagen Source Connector for Confluent Cloud. For this demo, configure the sample producers as follows:

    1. Stock trades: quickstart = STOCK_TRADES

    2. Users: quickstart = USERS

For detailed instructions on these steps, refer to the Confluent Cloud Quick Start guide.

Data Producers

To demonstrate how Tableflow works, we will enable it on our two Kafka topics, both powered by the Datagen connector. The first topic will represent stock trades, and the second will represent users. Their corresponding topic names and AVRO schemas are provided below:

Below are the two Datagen connectors that were set up in this demo:

The corresponding Kafka topics and sample messages are shown below:

Enabling Tableflow

Now that we have our two topics created (tableflow-stock-trades and tableflow-users), we can enable Tableflow on them. To do this:

  1. Navigate to your Topics section in Confluent Cloud.

  2. On the right-hand side of each topic, you will see an option to Enable Tableflow.

Let’s walk through enabling Tableflow together on one of the topics. Follow these steps:

  1. Click on Enable Tableflow for the topic tableflow-stock-trades.

  2. Select the option Use Confluent storage.

  3. That is it, as simple as 1-2-3! It may take up to 15 minutes with the status set as Pending.

  4. Once the Tableflow status is set to Syncing for the corresponding topics, you are good to go!

  5. Every time Tableflow updates a table, it creates a snapshot, a versioned state of the table that allows time-travel queries. By default, when enabling Tableflow, snapshot expiration is set to Infinite, and failure mode is set to Suspend. With infinite expiration, all snapshots are retained indefinitely, allowing users to query historical data at any point. Suspend failure mode ensures that if a record fails to be materialized, Tableflow pauses processing rather than skipping the record, preventing potential data inconsistencies. To change this, select the topic where Tableflow is enabled and click on the Configuration tab, then click on Edit settings under the Tableflow box.

  6. For this demo, let’s change the failure strategy to Skip. In the Tableflow edit settings section, set Mode to Skip and click on Save changes.

Setting up Trino

Since Trino will be running inside Docker, make sure Docker is installed on your machine. You can get it from the official Docker installation guide. For more details on Docker and Docker Compose, refer to the official documentation.

Trino requires a catalog to define how it connects to various data sources. In this case, we will use a REST catalog, which allows Trino to communicate with Tableflow using standard REST API calls. Introduced in Apache Iceberg 0.14.0, the REST catalog provides a scalable alternative to Hive metastore catalogs, simplifying metadata management in cloud-native environments. This enables real-time access to Iceberg tables in Tableflow for both streaming and analytical workloads. For more details, refer to the Iceberg GitHub repository and the official documentation.

To configure the REST catalog for Trino, create a catalog folder in the same directory where the Docker Compose file is located. Inside this folder, create a file named demo-tableflow.properties with the following content:

connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.oauth2.credential=<api-key>:<api-secret>
iceberg.rest-catalog.security=OAUTH2
iceberg.security=read_only

iceberg.rest-catalog.uri=https://tableflow.<cc_region>.aws.confluent.cloud/iceberg/catalog/organizations/<org_id>/environments/<env_id>

iceberg.rest-catalog.vended-credentials-enabled=true
fs.native-s3.enabled=true
s3.region=<cc_region>

If multiple catalogs are needed, for example when topics from another Kafka cluster in a different Confluent Cloud environment are being materialized as Iceberg, additional .properties files can be created within the catalog folder. Each catalog should have its own dedicated configuration file, allowing Trino to connect to different Iceberg catalogs as required.

The next step is to fill out the placeholders <...> with the appropriate values:

  1. For <api-key>:<api-secret>, in Confluent Cloud, click Tableflow in the left-side menu and then on Create/View API keys under the API access box.

  2. Click on + Add API Key.

  3. The API key can be created for your current user account, or alternatively, you can select a service account with the appropriate permissions required. For this example, let’s use the account logged into Confluent Cloud, so select My account and click Next.

  4. Select Tableflow and click Next.

  5. Enter a Name and Description for the Tableflow user account credentials, then click Create API key.

  6. Take note of the Key and Secret and replace the placeholders <api-key>:<api-secret> accordingly.

  7. Next, we need to configure the URI for the Tableflow REST Catalog: https://tableflow.<cc_region>.aws.confluent.cloud/iceberg/catalog/organizations/<org_id>/environments/<env_id>

  8. Let’s go one by one:

    1. <cc_region> is where your Kafka cluster is deployed. Click on Cluster Settings under your Kafka cluster page, where the region is shown in the Cloud details box.

    2. Also, take note of the Cluster ID (lkc-xxxxx), in the Identification box, as that defines the schema.

    3. <org_id> is the Organization ID of your Confluent Cloud Account. For that, click on the hamburger menu (top-right corner) and select Organization settings.

    4. The Organization ID will be shown in your Confluent Cloud account, as below (UUID formatted).

    5. Finally, for the <env_id>, this is the Confluent Cloud Environment ID where your Kafka cluster is provisioned. On the left-side menu, select Environments and click on the environment where your Kafka cluster is located. The Environment ID will be displayed on the right side, and it will be in the format env-xxxxx.

Running Trino With Docker

Once Docker is installed and set up, you can run Trino using a simple Docker command, there is no need for Docker Compose unless you prefer it.

Option 1: Using docker run (recommended for this demo)

Since this demo only requires Trino, you can run it directly using the docker run command:

docker run -d \
  --rm \
  --name trino \
  -p 8080:8080 \
  -v "./catalog:/etc/trino/catalog" \
  trinodb/trino:latest

Option 2: Using Docker Compose (optional)

If you prefer to use Docker Compose, create a docker-compose.yml file with the following content:

services:
  trino:
    image: trinodb/trino:latest
    ports:
      - 8080:8080
    volumes:
      - ./catalog:/etc/trino/catalog

Then, after filling in the placeholders in the demo-tableflow.properties, start the container with:

docker compose up -d

When run for the first time, the output should look similar to the following:

[+] Running 11/11
 ✔ trino Pulled                                                             29.5s 
   ✔ f57f672cc10f Pull complete                                              0.9s 
   ✔ 81a72d3fda32 Pull complete                                              0.9s 
   ✔ 7f4b5e81246a Pull complete                                              6.3s 
   ✔ 2f1a7d07b64f Pull complete                                              7.1s 
   ✔ 0b9be062d509 Pull complete                                              7.1s 
   ✔ 06fb3fa956bc Pull complete                                              7.2s 
   ✔ df70543f3320 Pull complete                                              7.2s 
   ✔ 8180fe82f9d8 Pull complete                                             27.2s 
   ✔ 98f4164e9920 Pull complete                                             27.3s 
   ✔ f5be4c0952f3 Pull complete                                             27.3s 
[+] Running 2/2
 ✔ Network trino-tableflow_default    Created                                0.0s 
 ✔ Container trino-tableflow-trino-1  Started                                1.1s

Once the process completes, your Trino container should be up and running. To verify that, open your browser and go to http://localhost:8080/ui (username is “admin”, no password is required). This will take you to the Trino UI, where you can check the status of your queries and manage the Trino environment.

Installing Required Python Libraries

To continue with the demo, you will need the trino, pandas and matplotlib Python libraries. If you don’t have them installed, you can easily install them using pip or another package manager of your choice, such as conda.

Run the following command to install both libraries (if you prefer to keep dependencies isolated, you can create a virtual environment before installing the libraries):

python3 -m pip install trino pandas matplotlib

Creating the Jupyter Notebook

To continue with the demo, we will use a Jupyter Notebook. You can use any tool of your choice to create the notebook, but for this demo, we will use the Jupyter Extension for Visual Studio Code. You can install it from the Visual Studio Code Marketplace.

import trino
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
from trino.auth import BasicAuthentication

Now, let’s connect to our Trino cluster. The name of the catalog is the same as the properties file created under the catalog folder. In this case, we have defined it as demo-tableflow.

# Connect to Trino
conn = trino.dbapi.connect(
    host="localhost",
    port=8080,
    user="admin",
    catalog="demo-tableflow",
    #schema=...,  # Not required as the schema will be specified when querying Trino
    #auth=BasicAuthentication("admin", "your_password_here"),  # Authenticate using basic authentication, but not required for this demo
)

cursor = conn.cursor()

This will create a connection to your Trino cluster running on localhost and allow you to query data from the demo-tableflow catalog.

We can now issue SQL queries to visualize the data. Let’s take a look at the tableflow-users table. Since we specified the catalog name when connecting to Trino, the table name follows the format <schema>.<table_name>, where the schema is the Confluent Cloud cluster ID (e.g., lkc-vryp0n), and the table name is the Kafka topic name (e.g., tableflow-users). If no catalog is specified when connecting to Trino, the table name format would be <catalog>.<schema>.<table_name>.

It’s important to mention that the Datagen source connector generates a continuous stream of records for each user. To simplify the results, we will query only the latest record for each user (topic tableflow-users).

# Run a query and fetch results
# Table name is "<Confluent_cloud_clusterID>"."<kafka_topic_name>"
cursor.execute("""
    WITH latest_users AS (
        SELECT *,
        ROW_NUMBER() OVER (PARTITION BY userid ORDER BY "$$timestamp" DESC) AS rn
        FROM "lkc-vryp0n"."tableflow-users"
    )
    SELECT * 
    FROM latest_users
    WHERE rn = 1
    ORDER BY userid""")

rows = cursor.fetchall()

# Get column names from cursor.description
columns = [desc[0] for desc in cursor.description]

# Convert to Pandas DataFrame
df_users = pd.DataFrame(rows, columns=columns)

# Decode "key"
df_users['key'] = df_users['key'].str.decode('utf-8')

# Display DataFrame
df_users

Tableflow automatically adds metadata to each record, including the Kafka topic name, partition where the message was stored, headers, leader epoch, offset, timestamp, and the raw key and value. In this case, since the values are AVRO-encoded, the raw value will contain the actual AVRO encoding. Please note, fields prefixed with $$ are cropped out in the table for brevity.

Let’s take a look at the latest stock trade rows (topic tableflow-stock-trades). This time, however, we’ll filter out the columns prefixed with $$ except $$timestamp.

Finally, let’s join and aggregate both tables to calculate the total net sales per user. While this is done via a SQL query, the previous queries have already loaded pandas DataFrames into memory, so you could achieve the same result using pandas directly. Choose the approach that best fits your use case.

# Table name is "<Confluent_cloud_clusterID>"."<kafka_topic_name>"
cursor.execute("""
    WITH latest_users AS (
        SELECT *,
        ROW_NUMBER() OVER (PARTITION BY userid ORDER BY "$$timestamp" DESC) AS rn
        FROM "lkc-vryp0n"."tableflow-users"
    )
    SELECT
        u.userid,
        u.regionid,
        u.gender,
        format('%,.2f', SUM(
        CASE 
            WHEN side = 'BUY' THEN (-1 * s.quantity * s.price) / 100. 
            WHEN side = 'SELL' THEN (s.quantity * s.price) / 100. 
            ELSE 0 
        END
        )) AS net_total
    FROM latest_users AS u
    LEFT JOIN "lkc-vryp0n"."tableflow-stock-trades" AS s
    ON s.userid = u.userid
    WHERE u.rn = 1
    GROUP BY u.userid, u.regionid, u.gender
    ORDER BY net_total DESC""")

rows = cursor.fetchall()

# Get column names from cursor.description
columns = [desc[0] for desc in cursor.description]

# Convert to Pandas DataFrame
df_merged = pd.DataFrame(rows, columns=columns)

# Display DataFrame
df_merged

Lastly, let’s do something cool! Let’s create a stacked bar chart to visualize net sales per user, with each stock symbol represented as a different color in the legend. The x-axis will show user IDs, while the y-axis will represent net sales values, formatted in dollars with thousand separators for readability (purchases are negative values, sales are positive values). This way, we can quickly spot trading trends, see which users are most active, and understand their portfolio compositions at a glance.

# Calculate net sales per transaction
df_stock["value"] = df_stock["quantity"] * df_stock["price"]
df_stock["value"] = df_stock.apply(lambda row: row["value"] if row["side"] == "SELL" else -row["value"], axis=1)

# Group by userid and symbol, then sum the values
df_summary = df_stock.groupby(["userid", "symbol"])["value"].sum().unstack(fill_value=0)

# Plot the stacked bar chart
ax = df_summary.plot(kind="bar", stacked=True, figsize=(12, 6), colormap="tab10")

# Format y-axis as dollar values with thousand separators
ax.yaxis.set_major_formatter(mticker.FuncFormatter(lambda x, _: f'${x:,.0f}'))

plt.xlabel("User ID")
plt.ylabel("Net Sales Value")
plt.title("Net Sales per User ID (Stacked by Symbol)")
plt.xticks(rotation=45)
plt.legend(title="Symbol", bbox_to_anchor=(1.05, 1), loc="upper left")
plt.grid(axis="y", linestyle="--", alpha=0.7)

plt.show()

The sky’s the limit! Users can run queries as they wish, and Tableflow will continuously update the Iceberg tables. The results can be displayed as tables or visualized as charts (like the Matplotlib chart we just created)—offering endless possibilities for data exploration and analysis.

Stopping and Removing Trino

Once you’re done with the demo, you can stop and remove the Trino container to free up resources on your machine.

If you used docker run:

docker stop trino 

If you used Docker Compose:

docker compose down

Both commands will clean up the running container and any associated resources.

Wrapping It All Up

In this blog, we successfully integrated Confluent Tableflow with Trino, and used a Jupyter Notebook to query and visualize data from Apache Iceberg tables. We set up Kafka topics, enabled Tableflow, connected to Trino, and utilized a REST catalog to streamline data access. This setup allowed us to work with real-time data and aggregate insights like total net sales per user; Tableflow also supports schema evolution, making it straightforward to adapt to changes in your data structure without breaking downstream queries. Taken as a group, these tools let you easily analyze both streaming and analytical data, providing valuable insights from your data pipelines.

Ready to take your data analysis to the next level? Explore Confluent Tableflow, Trino, and Apache Iceberg today to unlock the power of real-time and historical data at scale. Get started now!

‎ 

Apache®, Apache Kafka®, Kafka®, Apache Iceberg™️, Iceberg™️, Apache Spark™️, are either registered trademarks or trademarks of the Apache Software Foundation. No endorsement by the Apache Software Foundation is implied by the use of these marks.

  • Italo Nesi is a Sr. Solutions Engineer at Confluent, bringing a wealth of over 30 years of experience in various roles such as software engineer, solutions engineer/architect, pre-sales engineer, full stack developer, IoT developer/architect, and a passionate home automation hobbyist. He possesses a strong penchant for building innovative solutions rather than starting from scratch, leveraging existing tools and technologies to deliver efficient and effective results for the core business. His expertise lies in combining his technical prowess with a practical approach, ensuring optimal outcomes while avoiding unnecessary reinvention of the wheel. He holds a bachelor’s degree in electronics engineering from the Federal University of Rio Grande do Norte/Brazil, an MBA from the Federal University of Rio de Janeiro/Brazil (COPPEAD), and an executive master’s degree in International Supply Chain Management from Université Catholique de Louvain/Belgium.

Did you like this blog post? Share it now