Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Flink AI: Real-Time ML and GenAI Enrichment of Streaming Data with Flink SQL on Confluent Cloud

Written By

Data streaming, meet AI; AI, meet data streaming

Modern data platforms enable enterprises to extract valuable business insights from data, sourced from various origins. Data engineers, data scientists, and other data practitioners utilize both data streaming and batch processing frameworks as a means to provide these insights. While batch processes work on historical data, stream processing extracts insights in real time, enabling businesses to react faster with respect to changing events. 

Often, these frameworks evolve into an all-purpose data platform with clearly defined practices for storing, querying, governing, and maintaining data applications. A top goal of modern data platforms is to unify streaming and batch data processing to ensure that the quality of the insights remain relevant in time. The combination of ensuring data quality upfront (i.e., “shift-left” to impose data quality rules to reduce bad data/bad events) and unification of streaming and batch data helps in establishing business trust on data processes (DataOps), which is fundamental to drive business growth with the help of such insights.

Managed Apache Kafka® clusters on Confluent Cloud have become the de facto standard for building sophisticated data streaming applications. While Kafka topics provide fast, durable, and fault-tolerant storage for streaming data, Apache Flink® on Confluent Cloud helps to write complex stream processing jobs using the simple and familiar SQL syntax to provide the compute needed for enriching and transforming streaming data. A Kafka topic on Confluent Cloud is automatically exposed as a Flink dynamic table, so that SQL operations can be performed directly on streaming data by data streaming engineers. Plus, these Flink SQL jobs are completely managed by Confluent Cloud and auto-scale based on load.

Flink AI (FLIP-437) opens up a whole new paradigm for Flink SQL on Confluent Cloud. Machine-learning models are now first-class citizens in Flink, just like tables and functions. It is now possible to invoke generative AI (GenAI) large language model (LLM) endpoints (OpenAI, Azure OpenAI, Amazon Bedrock, Google Vertex AI, etc.), as well as “narrow AI” models (custom ML and deep learning models built using frameworks like PyTorch, TensorFlow, etc.) hosted on AWS, Google Cloud, and Azure directly from Flink SQL statements, for specific tasks like regression, clustering, and classification.

These invocations travel over HTTPS and respond with results that enrich the quality of streaming data, instantaneously. This feature is currently in “early access (EA)” mode and will be made “generally available (GA)” soon.

AI model inference

AI model inference (i.e., generating a prediction by providing text or other inputs to a trained model) using Flink SQL enables data to be enriched with context and is trustworthy in terms of data quality. It also enables integration between AI workloads deployed on any cloud, like OpenAI, Amazon SageMaker, Amazon Bedrock, Azure OpenAI, Google Cloud, Vertex AI, etc., and streaming data in Confluent Cloud, allowing developers to build smarter applications through real-time ML predictions. For example, if a user leaves a negative product review, which experience is better: the app that simply logs the review and says “Thank you for the feedback,” or the app that recognizes the product and root cause of the complaint (e.g., item out of stock), and dynamically points the user to a solution (e.g., expected product availability date or alternate products) in real time?

Confluent Cloud for Apache Flink®️ supports AI model inference and enables the use of models as resources in Flink SQL, just like tables and functions. You can use a SQL statement to create a model resource and invoke it for inference in streaming queries.

The analysis of customer reviews for natural language processing (NLP) tasks like sentiment prediction and entity recognition is a common example use case that we will use to demonstrate LLM-based streaming data enrichment.

Example use case: Interpreting product reviews

An example of a customer review rating using Flink AI remote model inference is illustrated below.

The following steps are used to demonstrate Flink model inference on Confluent Cloud: 

  1. Create a Kafka topic on Confluent Cloud.

  2. Simulate streaming customer review events by ingesting into the topic records from the Kaggle Amazon Fine Food Reviews CSV file, here

    1. [Citation: Original paper - arxiv]: J. McAuley and J. Leskovec. From Amateurs to Connoisseurs: Modeling the Evolution of User Expertise Through Online Reviews. WWW, 2013. 

  3. Create a Flink SQL remote model on Confluent Cloud.

  4. Invoke the model and store sentiment ratings.

  5. Visualize by consuming from the customer rating dynamic table.

Step 1: Create a Kafka topic on Confluent Cloud

A topic is created, named product_review on Confluent Cloud, with all default settings. The original review CSV file has more than half a million reviews with 10 columns. For this example, only a few columns are selected: review_id, product_id, user_id, timestamp, and summary

Records read from the CSV file would be ingested at the product_review topic.

Step 2: Python producer to send review messages from the review CSV file

For the product_review topic an Avro schema (review.avsc) is also created with the following specifications:

This schema is associated with the raw_review topic within the producer code.

{
    "namespace": "confluent.io.examples.serialization.avro",
    "name": "Review",
    "type": "record",
    "fields": [
        {
            "name": "review_id",
            "type": "int"
        },
        {
            "name": "product_id",
            "type": "string"
        },
        {
            "name": "user_id",
            "type": "string"
        },
        {
            "name": "timestamp",
            "type": "int"
        },
        {
            "name": "summary",
            "type": "string"
        }
    ]
}

The producer reads the Avro schema file, opens the abridged CSV file and reads one line at a time. The product_id field is used as the key and the rest of the fields comprise the value of the Kafka message. After serializing both the key and the value with a string_serializer and an avro_serializer the message is packed and sent to the Kafka topic on Confluent Cloud.

An excerpt of the Python producer code is shown below:

def main():
   topic = 'product_review'
   schema = 'review.avsc'

   cc_config = {
       'bootstrap.servers': '<BOOTSTRAP SERVERS ENDPOINT>',
       'security.protocol': 'SASL_SSL',
       'sasl.mechanisms': 'PLAIN',
       'sasl.username': '<KAFKA API KEY>',
       'sasl.password': '<KAFKA API SECRET>'
   }

   sr_config = {
       'url': '<SR ENDPOINT URL>',
       'basic.auth.user.info': '<SR API KEY>:<SR API SECRET>'
   }

   with open(f"{schema}") as f:
       schema_str = f.read()

   schema_registry_conf = sr_config
   schema_registry_client = SchemaRegistryClient(schema_registry_conf)

   avro_serializer = AvroSerializer(schema_registry_client,
                                    schema_str,
                                    reading_to_dict)
   string_serializer = StringSerializer('utf_8')

   producer = Producer(cc_config)

   print("Producing review records to topic {}. ^C to exit.".format(topic))

   count = 0
   with open('./Reviews.csv', 'r') as f:
       next(f)
       reader = csv.reader(f, delimiter=',')
       for column in reader:
           review = Review(id=int(column[0]),
                           product_id=column[1],
                           user_id=column[2],
                           timestamp=int(column[7]),
                           summary=column[9])

           producer.produce(topic=topic,
                            key=string_serializer(str(review.product_id), SerializationContext(topic=topic, field=MessageField.KEY)),
                            value=avro_serializer(review, SerializationContext(topic, MessageField.VALUE)),
                            on_delivery=delivery_report)
           count = count + 1
           if count >= 20000:
               break

   producer.poll(10000)
   producer.flush()

To keep the experiment simple, 20,000 records are read from the CSV file, as shown in the code.

Check to see if the topic on Confluent Cloud is getting populated with the review records:

Step 3: Create Flink AI model on Confluent Cloud

Flink AI Model Inference is an early access program feature in Confluent Cloud introduced to gather feedback. For this demonstration, an OpenAI GPT-4o model is called over HTTPS. The Flink model is created as a resource on Confluent Cloud using Flink SQL statements.

The following statement on the Flink SQL workspace on Confluent Cloud creates a Flink AI model on Confluent Cloud that is capable of accepting a STRING as a review, and after invoking OpenAI’s GPT-4o remotely, stores the review rating in another field.

SET 'sql.secrets.my_api_key' = '<OPENAI_API_KEY>';
CREATE MODEL amazon_reviewrating
INPUT(text STRING)
OUTPUT(rating STRING)
COMMENT 'amazon fine food review rating'
WITH (
  'provider' = 'openai',
  'task' = 'classification',
  'openai.model_version' = 'gpt-4o',
  'openai.endpoint' = 'https://api.openai.com/v1/chat/completions',
  'openai.api_key' = '{{sessionconfig/sql.secrets.my_api_key}}',
  'openai.system_prompt' = 'Analyze the sentiment of the text and return ONLY a numerical rating between 1 to 5.'
);

Currently, during the early access program, Flink models support the following tasks:

  1. Classification

  2. Text generation

  3. Clustering

For this experiment, the classification task is chosen. The GPT-4o prompt ensures that the rating remains numerical and within 1 to 5.

Step 4: Invoke the Flink model and store the review rating on a Flink downstream table

Once the Kafka topic product_review is populated with the streaming review record messages, a downstream Flink dynamic table  is created with the name product_ratings. This table stores ratings that correspond to products from the product_review topic.

The following statement on the Flink SQL workspace on Confluent Cloud creates this dynamic table:

CREATE TABLE product_ratings(
  product_id STRING,
  rating STRING
);

Once this table is created, a new Flink SQL statement is run to populate the downstream table with product review ratings.

The following statement on the Flink SQL workspace on Confluent Cloud creates a Flink AI model on Confluent Cloud capable of accepting a STRING as a review, and after invoking OpenAI’s GPT-4o remotely, stores the review rating in another field. This is a result of the ML_PREDICT Flink AI Model Inference invocation.

INSERT INTO `product_ratings`
SELECT 
  product_id, rating 
FROM `product_review`,LATERAL TABLE(ML_PREDICT('amazon_reviewrating', summary));

The Flink AI model amazon_reviewrating is invoked and populates the review rating based on the prompt provided during the creation of the model, for each record.

The general syntax to invoke the remote model is:

SELECT * FROM my_table, LATERAL TABLE(ML_PREDICT('<model_name>', column1, column2));

Selecting data from the product_ratings table from the Flink SQL workspace now produces the review ratings per product:

SELECT * FROM `product_ratings`;

With real-time review ratings available on the Flink table, it becomes simple to extract this information, since Flink dynamic tables are backed by Kafka topics on Confluent Cloud.

Step 5: Visualizing product review ratings on a heatmap

To show a visualization of the results, we used Elastic Cloud’s Kibana Lens. Before creating a dashboard, the enriched product_ratings topic must land in Elastic via the Elasticsearch Service Sink Connector for Confluent Cloud. The following credentials are needed to create the connector:

  1. The Elasticsearch endpoint provided in Elastic Cloud of the form: https://12345.us-east4.gcp.elastic-cloud.com:443.

  2. The password for the elastic user.

  3. A Confluent Cloud API key for Kafka. This is what the connector uses to authenticate to Kafka.

Given these fields, the Confluent CLI is used to create a connector. First a connector configuration file must be created:

{
  "config": {
    "topics": "product_ratings",
    "schema.context.name": "default",
    "input.data.format": "AVRO",
    "connector.class": "ElasticsearchSink",
    "name": "ElasticsearchSinkConnector",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "<KAFKA API KEY>",
    "kafka.api.secret": "<KAFKA API SECRET>",
    "connection.url": "<ELASTICSEARCH ENDPOINT>",
    "connection.username": "elastic",
    "connection.password": "<ELASTIC USER PASSWORD>",
    "elastic.security.protocol": "SSL",
    "elastic.https.ssl.keystore.type": "JKS",
    "elastic.https.ssl.truststore.type": "JKS",
    "elastic.https.ssl.keymanager.algorithm": "SunX509",
    "elastic.https.ssl.trustmanager.algorithm": "PKIX",
    "elastic.https.ssl.endpoint.identification.algorithm": "https",
    "key.ignore": "false",
    "schema.ignore": "false",
    "compact.map.entries": "true",
    "write.method": "INSERT",
    "behavior.on.null.values": "ignore",
    "behavior.on.malformed.documents": "fail",
    "drop.invalid.message": "false",
    "batch.size": "2000",
    "linger.ms": "1000",
    "flush.timeout.ms": "10000",
    "connection.compression": "false",
    "read.timeout.ms": "15000",
    "max.poll.interval.ms": "300000",
    "max.poll.records": "500",
    "tasks.max": "1",
    "data.stream.type": "none"
  }
}

Now the Confluent CLI can be leveraged to create the connector in Confluent Cloud:

confluent connect cluster create \
    --environment <ENVIRONMENT ID> \
    --cluster <CLUSTER ID> \
    --config-file <PATH TO CONNECTOR CONFIG FILE>

Once the connector is provisioned and sends product ratings to Elasticsearch, Kibana Lens can be used by navigating to Analytics > Dashboard in Elastic Cloud and selecting Lens when adding a panel to the dashboard. Here is a simple donut pie chart panel that shows the percentage breakdown of ratings by predicted sentiment score:

For an interesting visualization using the raw review text itself, here is a tag cloud of the most popular phrases in the reviews data set:

On the surface, the two visualizations jibe and suggest that the score distribution makes sense: a minority of reviews rank 1 or 2, and just three of the top 25 keywords are negative (“Just OK,” “Disappointed,” “Below standard”).

What’s coming

As mentioned earlier, Flink AI Model Inference on Confluent Cloud is in early access and new features are being added to enable multiple use cases. 

Confluent customers who are interested in signing up for this new feature can reach out to their respective account teams and get started.

This article is the first in a series of articles where Flink AI remote model inference features are explained. Future articles in this series will demonstrate how narrow AI models deployed on cloud hyperscalers could be used with Flink AI.

Conclusion

Invoking remote models is an important first step in enabling shift-left principles within the data streaming platform on Confluent Cloud. Flink on Confluent Cloud can easily rank, score, and rate event properties within data streams, resulting in clean data at the source. This is a fundamental principle while building modern data platforms and will greatly increase the value of information extracted from such platforms.

  • Diptiman Raichaudhuri is a Staff Developer Advocate at Confluent. He is passionate about developers “getting started” with streaming data platforms and works at the intersection of data and AI.

  • Dave Troiano is a Developer Experience Engineer at Confluent. He enjoys writing code and prose to help foster a smooth developer experience with Apache Kafka and Confluent. He blogs here on Confluent at https://www.confluent.io/blog/author/dave-troiano/.

Did you like this blog post? Share it now