Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now
Modern data platforms enable enterprises to extract valuable business insights from data, sourced from various origins. Data engineers, data scientists, and other data practitioners utilize both data streaming and batch processing frameworks as a means to provide these insights. While batch processes work on historical data, stream processing extracts insights in real time, enabling businesses to react faster with respect to changing events.
Often, these frameworks evolve into an all-purpose data platform with clearly defined practices for storing, querying, governing, and maintaining data applications. A top goal of modern data platforms is to unify streaming and batch data processing to ensure that the quality of the insights remain relevant in time. The combination of ensuring data quality upfront (i.e., “shift-left” to impose data quality rules to reduce bad data/bad events) and unification of streaming and batch data helps in establishing business trust on data processes (DataOps), which is fundamental to drive business growth with the help of such insights.
Managed Apache Kafka® clusters on Confluent Cloud have become the de facto standard for building sophisticated data streaming applications. While Kafka topics provide fast, durable, and fault-tolerant storage for streaming data, Apache Flink® on Confluent Cloud helps to write complex stream processing jobs using the simple and familiar SQL syntax to provide the compute needed for enriching and transforming streaming data. A Kafka topic on Confluent Cloud is automatically exposed as a Flink dynamic table, so that SQL operations can be performed directly on streaming data by data streaming engineers. Plus, these Flink SQL jobs are completely managed by Confluent Cloud and auto-scale based on load.
Flink AI (FLIP-437) opens up a whole new paradigm for Flink SQL on Confluent Cloud. Machine-learning models are now first-class citizens in Flink, just like tables and functions. It is now possible to invoke generative AI (GenAI) large language model (LLM) endpoints (OpenAI, Azure OpenAI, Amazon Bedrock, Google Vertex AI, etc.), as well as “narrow AI” models (custom ML and deep learning models built using frameworks like PyTorch, TensorFlow, etc.) hosted on AWS, Google Cloud, and Azure directly from Flink SQL statements, for specific tasks like regression, clustering, and classification.
These invocations travel over HTTPS and respond with results that enrich the quality of streaming data, instantaneously. This feature is currently in “early access (EA)” mode and will be made “generally available (GA)” soon.
AI model inference (i.e., generating a prediction by providing text or other inputs to a trained model) using Flink SQL enables data to be enriched with context and is trustworthy in terms of data quality. It also enables integration between AI workloads deployed on any cloud, like OpenAI, Amazon SageMaker, Amazon Bedrock, Azure OpenAI, Google Cloud, Vertex AI, etc., and streaming data in Confluent Cloud, allowing developers to build smarter applications through real-time ML predictions. For example, if a user leaves a negative product review, which experience is better: the app that simply logs the review and says “Thank you for the feedback,” or the app that recognizes the product and root cause of the complaint (e.g., item out of stock), and dynamically points the user to a solution (e.g., expected product availability date or alternate products) in real time?
Confluent Cloud for Apache Flink®️ supports AI model inference and enables the use of models as resources in Flink SQL, just like tables and functions. You can use a SQL statement to create a model resource and invoke it for inference in streaming queries.
The analysis of customer reviews for natural language processing (NLP) tasks like sentiment prediction and entity recognition is a common example use case that we will use to demonstrate LLM-based streaming data enrichment.
An example of a customer review rating using Flink AI remote model inference is illustrated below.
The following steps are used to demonstrate Flink model inference on Confluent Cloud:
Create a Kafka topic on Confluent Cloud.
Simulate streaming customer review events by ingesting into the topic records from the Kaggle Amazon Fine Food Reviews CSV file, here.
[Citation: Original paper - arxiv]: J. McAuley and J. Leskovec. From Amateurs to Connoisseurs: Modeling the Evolution of User Expertise Through Online Reviews. WWW, 2013.
Create a Flink SQL remote model on Confluent Cloud.
Invoke the model and store sentiment ratings.
Visualize by consuming from the customer rating dynamic table.
Step 1: Create a Kafka topic on Confluent Cloud
A topic is created, named product_review
on Confluent Cloud, with all default settings. The original review CSV file has more than half a million reviews with 10 columns. For this example, only a few columns are selected: review_id
, product_id
, user_id
, timestamp
, and summary
.
Records read from the CSV file would be ingested at the product_review
topic.
Step 2: Python producer to send review messages from the review CSV file
For the product_review
topic an Avro schema (review.avsc
) is also created with the following specifications:
This schema is associated with the raw_review
topic within the producer code.
The producer reads the Avro schema file, opens the abridged CSV file and reads one line at a time. The product_id
field is used as the key and the rest of the fields comprise the value
of the Kafka message. After serializing both the key and the value with a string_serializer
and an avro_serializer
the message is packed and sent to the Kafka topic on Confluent Cloud.
An excerpt of the Python producer code is shown below:
To keep the experiment simple, 20,000 records are read from the CSV file, as shown in the code.
Check to see if the topic on Confluent Cloud is getting populated with the review records:
Step 3: Create Flink AI model on Confluent Cloud
Flink AI Model Inference is an early access program feature in Confluent Cloud introduced to gather feedback. For this demonstration, an OpenAI GPT-4o model is called over HTTPS. The Flink model is created as a resource on Confluent Cloud using Flink SQL statements.
The following statement on the Flink SQL workspace on Confluent Cloud creates a Flink AI model on Confluent Cloud that is capable of accepting a STRING as a review, and after invoking OpenAI’s GPT-4o remotely, stores the review rating in another field.
Currently, during the early access program, Flink models support the following tasks:
Classification
Text generation
Clustering
For this experiment, the classification
task is chosen. The GPT-4o prompt ensures that the rating remains numerical and within 1 to 5
.
Step 4: Invoke the Flink model and store the review rating on a Flink downstream table
Once the Kafka topic product_review
is populated with the streaming review record messages, a downstream Flink dynamic table is created with the name product_ratings
. This table stores ratings that correspond to products from the product_review
topic.
The following statement on the Flink SQL workspace on Confluent Cloud creates this dynamic table:
Once this table is created, a new Flink SQL statement is run to populate the downstream table with product review ratings.
The following statement on the Flink SQL workspace on Confluent Cloud creates a Flink AI model on Confluent Cloud capable of accepting a STRING
as a review, and after invoking OpenAI’s GPT-4o remotely, stores the review rating in another field. This is a result of the ML_PREDICT
Flink AI Model Inference invocation.
The Flink AI model amazon_reviewrating
is invoked and populates the review rating based on the prompt provided during the creation of the model, for each record.
The general syntax to invoke the remote model is:
Selecting data from the product_ratings
table from the Flink SQL workspace now produces the review ratings per product:
With real-time review ratings available on the Flink table, it becomes simple to extract this information, since Flink dynamic tables are backed by Kafka topics on Confluent Cloud.
Step 5: Visualizing product review ratings on a heatmap
To show a visualization of the results, we used Elastic Cloud’s Kibana Lens. Before creating a dashboard, the enriched product_ratings
topic must land in Elastic via the Elasticsearch Service Sink Connector for Confluent Cloud. The following credentials are needed to create the connector:
The Elasticsearch endpoint provided in Elastic Cloud of the form: https://12345.us-east4.gcp.elastic-cloud.com:443.
The password for the elastic
user.
A Confluent Cloud API key for Kafka. This is what the connector uses to authenticate to Kafka.
Given these fields, the Confluent CLI is used to create a connector. First a connector configuration file must be created:
Now the Confluent CLI can be leveraged to create the connector in Confluent Cloud:
Once the connector is provisioned and sends product ratings to Elasticsearch, Kibana Lens can be used by navigating to Analytics > Dashboard
in Elastic Cloud and selecting Lens
when adding a panel to the dashboard. Here is a simple donut pie chart panel that shows the percentage breakdown of ratings by predicted sentiment score:
For an interesting visualization using the raw review text itself, here is a tag cloud of the most popular phrases in the reviews data set:
On the surface, the two visualizations jibe and suggest that the score distribution makes sense: a minority of reviews rank 1 or 2, and just three of the top 25 keywords are negative (“Just OK,” “Disappointed,” “Below standard”).
As mentioned earlier, Flink AI Model Inference on Confluent Cloud is in early access and new features are being added to enable multiple use cases.
Confluent customers who are interested in signing up for this new feature can reach out to their respective account teams and get started.
This article is the first in a series of articles where Flink AI remote model inference features are explained. Future articles in this series will demonstrate how narrow AI models deployed on cloud hyperscalers could be used with Flink AI.
Invoking remote models is an important first step in enabling shift-left principles within the data streaming platform on Confluent Cloud. Flink on Confluent Cloud can easily rank, score, and rate event properties within data streams, resulting in clean data at the source. This is a fundamental principle while building modern data platforms and will greatly increase the value of information extracted from such platforms.
In just a few months since it became widely available, generative AI has swiftly captivated the attention of organizations across industries. In March 2023, IDC polled organizations and found that 61% were already doing something with generative AI (GenAI).
As one of the largest cancer research and treatment organizations in the United States, City of Hope’s mission is to transform cancer care. Advancing this mission requires an array of cutting-edge technologies to fuel innovative treatments and services tailored for patients’ specific needs.