Hands-on Workshop: ZooKeeper to KRaft Without the Hassle | Secure Your Spot
What if you could analyze real-time and historical data with just a few clicks and minimal code? Whether you're a data scientist, engineer, or analyst working in Python, you don't need to be an Apache Kafka® expert to unlock the power of streaming analytics. In this blog, we'll walk you through integrating Confluent Tableflow with Trino, which will enable you to query and visualize Apache Iceberg tables effortlessly in Jupyter Notebooks. By the end, you'll see how easy it is to work with streaming data using familiar SQL queries—no complex setup required.
In the modern data landscape, businesses struggle to bridge the gap between real-time event streaming and analytical workloads. Confluent Tableflow provides a powerful solution by enabling the seamless integration of streaming and batch data, using Apache Iceberg tables. With Tableflow, organizations can persist and structure streaming data into Iceberg tables stored in cloud object storage, ensuring efficient, scalable, and cost-effective analytics. This allows users to perform complex queries on streaming data without requiring extensive ETL processes.
Now generally available, Tableflow is designed to unify real-time and historical analytics, enabling data teams to leverage the best of both worlds. Companies can retain high-fidelity event streams while also benefiting from the efficiency of structured analytical queries, making it easier to derive insights from their Confluent Cloud data.
Tableflow supports integration with various query engines and data catalogs, such as Trino, Apache Spark™️, DuckDB and Amazon SageMaker Lakehouse. This allows users to seamlessly connect their Apache Iceberg™️ tables with SQL-based engines and machine learning platforms, ensuring broad compatibility for different analytical needs. A great example of this is how Amazon SageMaker Lakehouse leverages Tableflow to enable machine learning and advanced analytics on streaming data. For more details, see this blog.
Trino is an open-source, distributed, SQL query engine designed for interactive and large-scale data analysis. It enables users to query data from multiple sources, including object storage, relational databases, and data lakes, using a single SQL interface. Trino supports ANSI SQL and is optimized for performance, making it a preferred choice for querying Iceberg tables stored in Confluent Tableflow.
With Trino, users can execute fast SQL queries over data stored in cloud environments, removing the need for time-consuming data migrations. Its ability to handle federated queries allows for seamless integration with Confluent Tableflow, making it easier to extract meaningful insights from real-time and historical data.
We have documentation on integrating Tableflow with Trino, which showcases how to query Apache Iceberg tables using SQL. While the docs demonstrate how to connect Trino with Tableflow, they primarily focus on SQL-based queries via the command line interface.
This blog takes the next step by providing a more hands-on, visual approach using Jupyter Notebooks. In conjunction with Python’s Pandas library, it shows you how to query Iceberg tables directly from Jupyter, enabling you to explore and visualize your data interactively. This approach simplifies the workflow for data scientists, engineers, and analysts who prefer a Python-native environment for their analytics needs.
We are about to bring data analytics to life with an intuitive, easy-code-driven approach!
To complete these steps, you’ll need access to Confluent Cloud. If you don’t have the required permissions, ask your Confluent Cloud administrator for assistance. The credentials required are the API key and secret, Confluent Cloud organizational ID, environment ID, cluster ID, topic names, and region. Once you have these, you’ll have everything needed to set up Trino and run your SQL queries.
If you’re new to Confluent Cloud, the first step is to sign into your account. If you don’t have an account yet, you can sign up for a free trial here. New users receive $400 in credits to use within the first 30 days.
Once logged in, follow these steps:
Create a Kafka cluster: Set up a Kafka cluster within your Confluent Cloud account.
Create a Kafka topic: Create a topic to which you’ll send and receive messages.
Set up a sample producer: To start sending messages to your topics, set up two sample Kafka producers using the Datagen Source Connector for Confluent Cloud. For this demo, configure the sample producers as follows:
Stock trades: quickstart
= STOCK_TRADES
Users: quickstart
= USERS
For detailed instructions on these steps, refer to the Confluent Cloud Quick Start guide.
To demonstrate how Tableflow works, we will enable it on our two Kafka topics, both powered by the Datagen connector. The first topic will represent stock trades, and the second will represent users. Their corresponding topic names and AVRO schemas are provided below:
Below are the two Datagen connectors that were set up in this demo:
The corresponding Kafka topics and sample messages are shown below:
Now that we have our two topics created (tableflow-stock-trades
and tableflow-users
), we can enable Tableflow on them. To do this:
Navigate to your Topics section in Confluent Cloud.
On the right-hand side of each topic, you will see an option to Enable Tableflow.
Let’s walk through enabling Tableflow together on one of the topics. Follow these steps:
Click on Enable Tableflow for the topic tableflow-stock-trades.
Select the option Use Confluent storage.
That is it, as simple as 1-2-3! It may take up to 15 minutes with the status set as Pending.
Once the Tableflow status is set to Syncing for the corresponding topics, you are good to go!
Every time Tableflow updates a table, it creates a snapshot, a versioned state of the table that allows time-travel queries. By default, when enabling Tableflow, snapshot expiration is set to Infinite, and failure mode is set to Suspend. With infinite expiration, all snapshots are retained indefinitely, allowing users to query historical data at any point. Suspend failure mode ensures that if a record fails to be materialized, Tableflow pauses processing rather than skipping the record, preventing potential data inconsistencies. To change this, select the topic where Tableflow is enabled and click on the Configuration tab, then click on Edit settings under the Tableflow box.
For this demo, let’s change the failure strategy to Skip. In the Tableflow edit settings section, set Mode to Skip and click on Save changes.
Since Trino will be running inside Docker, make sure Docker is installed on your machine. You can get it from the official Docker installation guide. For more details on Docker and Docker Compose, refer to the official documentation.
Trino requires a catalog to define how it connects to various data sources. In this case, we will use a REST catalog, which allows Trino to communicate with Tableflow using standard REST API calls. Introduced in Apache Iceberg 0.14.0, the REST catalog provides a scalable alternative to Hive metastore catalogs, simplifying metadata management in cloud-native environments. This enables real-time access to Iceberg tables in Tableflow for both streaming and analytical workloads. For more details, refer to the Iceberg GitHub repository and the official documentation.
To configure the REST catalog for Trino, create a catalog
folder in the same directory where the Docker Compose file is located. Inside this folder, create a file named demo-tableflow.properties
with the following content:
If multiple catalogs are needed, for example when topics from another Kafka cluster in a different Confluent Cloud environment are being materialized as Iceberg, additional .properties
files can be created within the catalog
folder. Each catalog should have its own dedicated configuration file, allowing Trino to connect to different Iceberg catalogs as required.
The next step is to fill out the placeholders <...>
with the appropriate values:
For <api-key>:<api-secret>
, in Confluent Cloud, click Tableflow in the left-side menu and then on Create/View API keys under the API access box.
Click on + Add API Key.
The API key can be created for your current user account, or alternatively, you can select a service account with the appropriate permissions required. For this example, let’s use the account logged into Confluent Cloud, so select My account and click Next.
Select Tableflow and click Next.
Enter a Name and Description for the Tableflow user account credentials, then click Create API key.
Take note of the Key and Secret and replace the placeholders <api-key>:<api-secret>
accordingly.
Next, we need to configure the URI for the Tableflow REST Catalog: https://tableflow.
<cc_region>
.aws.confluent.cloud/iceberg/catalog/organizations/
<org_id>
/environments/
<env_id>
Let’s go one by one:
<cc_region>
is where your Kafka cluster is deployed. Click on Cluster Settings under your Kafka cluster page, where the region is shown in the Cloud details box.
Also, take note of the Cluster ID (lkc-xxxxx), in the Identification box, as that defines the schema.
<org_id>
is the Organization ID of your Confluent Cloud Account. For that, click on the hamburger menu (top-right corner) and select Organization settings.
The Organization ID will be shown in your Confluent Cloud account, as below (UUID formatted).
Finally, for the <env_id>
, this is the Confluent Cloud Environment ID where your Kafka cluster is provisioned. On the left-side menu, select Environments and click on the environment where your Kafka cluster is located. The Environment ID will be displayed on the right side, and it will be in the format env-xxxxx.
Once Docker is installed and set up, you can run Trino using a simple Docker command, there is no need for Docker Compose unless you prefer it.
Since this demo only requires Trino, you can run it directly using the docker run
command:
If you prefer to use Docker Compose, create a docker-compose.yml
file with the following content:
Then, after filling in the placeholders in the demo-tableflow.properties
, start the container with:
docker compose up -d
When run for the first time, the output should look similar to the following:
Once the process completes, your Trino container should be up and running. To verify that, open your browser and go to http://localhost:8080/ui (username is “admin”, no password is required). This will take you to the Trino UI, where you can check the status of your queries and manage the Trino environment.
To continue with the demo, you will need the trino, pandas and matplotlib Python libraries. If you don’t have them installed, you can easily install them using pip or another package manager of your choice, such as conda.
Run the following command to install both libraries (if you prefer to keep dependencies isolated, you can create a virtual environment before installing the libraries):
python3 -m pip install trino pandas matplotlib
To continue with the demo, we will use a Jupyter Notebook. You can use any tool of your choice to create the notebook, but for this demo, we will use the Jupyter Extension for Visual Studio Code. You can install it from the Visual Studio Code Marketplace.
Now, let’s connect to our Trino cluster. The name of the catalog is the same as the properties
file created under the catalog
folder. In this case, we have defined it as demo-tableflow
.
This will create a connection to your Trino cluster running on localhost and allow you to query data from the demo-tableflow
catalog.
We can now issue SQL queries to visualize the data. Let’s take a look at the tableflow-users
table. Since we specified the catalog name when connecting to Trino, the table name follows the format <schema>.<table_name>, where the schema is the Confluent Cloud cluster ID (e.g., lkc-vryp0n), and the table name is the Kafka topic name (e.g., tableflow-users). If no catalog is specified when connecting to Trino, the table name format would be <catalog>.<schema>.<table_name>.
It’s important to mention that the Datagen source connector generates a continuous stream of records for each user. To simplify the results, we will query only the latest record for each user (topic tableflow-users
).
Tableflow automatically adds metadata to each record, including the Kafka topic name, partition where the message was stored, headers, leader epoch, offset, timestamp, and the raw key and value. In this case, since the values are AVRO-encoded, the raw value will contain the actual AVRO encoding. Please note, fields prefixed with $$ are cropped out in the table for brevity.
Let’s take a look at the latest stock trade rows (topic tableflow-stock-trades
). This time, however, we’ll filter out the columns prefixed with $$
except $$timestamp
.
Finally, let’s join and aggregate both tables to calculate the total net sales per user. While this is done via a SQL query, the previous queries have already loaded pandas DataFrames into memory, so you could achieve the same result using pandas directly. Choose the approach that best fits your use case.
Lastly, let’s do something cool! Let’s create a stacked bar chart to visualize net sales per user, with each stock symbol represented as a different color in the legend. The x-axis will show user IDs, while the y-axis will represent net sales values, formatted in dollars with thousand separators for readability (purchases are negative values, sales are positive values). This way, we can quickly spot trading trends, see which users are most active, and understand their portfolio compositions at a glance.
The sky’s the limit! Users can run queries as they wish, and Tableflow will continuously update the Iceberg tables. The results can be displayed as tables or visualized as charts (like the Matplotlib chart we just created)—offering endless possibilities for data exploration and analysis.
Once you’re done with the demo, you can stop and remove the Trino container to free up resources on your machine.
If you used docker run:
docker stop trino
If you used Docker Compose:
docker compose down
Both commands will clean up the running container and any associated resources.
In this blog, we successfully integrated Confluent Tableflow with Trino, and used a Jupyter Notebook to query and visualize data from Apache Iceberg tables. We set up Kafka topics, enabled Tableflow, connected to Trino, and utilized a REST catalog to streamline data access. This setup allowed us to work with real-time data and aggregate insights like total net sales per user; Tableflow also supports schema evolution, making it straightforward to adapt to changes in your data structure without breaking downstream queries. Taken as a group, these tools let you easily analyze both streaming and analytical data, providing valuable insights from your data pipelines.
Ready to take your data analysis to the next level? Explore Confluent Tableflow, Trino, and Apache Iceberg today to unlock the power of real-time and historical data at scale. Get started now!
Apache®, Apache Kafka®, Kafka®, Apache Iceberg™️, Iceberg™️, Apache Spark™️, are either registered trademarks or trademarks of the Apache Software Foundation. No endorsement by the Apache Software Foundation is implied by the use of these marks.
Tableflow represents Kafka topics as Apache Iceberg® (GA) and Delta Lake (EA) tables in a few clicks to feed any data warehouse, data lake, or analytics engine of your choice
Learn about our vision for Tableflow, a new feature in private early access that makes it push-button simple to take Apache Kafka® data and feed it directly into your data lake, warehouse, or analytics engine as Apache Iceberg® or Delta Lake tables.