Master Kafka, Flink & Tableflow in 5 Days: Join the Data Streaming Grand Prix | Register Now

From Oracle to MongoDB: How to Modernize Your Tech Stack for Real-Time AI Decisioning

Written By

Playlists for every mood and occasion. Media recommendations grouped by the most niche theme from your watch history. Sophisticated ad algorithms that optimize pay-per-click ads for the customer experience.

Whether you call them digital-native, disruptors, or just tech giants, the likes of Spotify, Netflix, and Amazon have long made uncannily personal experiences a key part of their differentiation or business models.

When you build these kinds of experiences, you make customers feel like your business really knows them and what they need—and makes them more likely to trust your recommendations.

Behind every bit of personalization the user sees on the frontend, there are dozens or even hundreds of decisions being made on the backend. Just a few years ago—even at their most sophisticated—these kinds of marketing, product, or sales decisions might be made with simple A/B testing and causal inference. But now they can be made even faster and smarter with artificial intelligence (AI).

That is, if you have the right foundation to readily serve legacy data to new AI engines and act on the resulting insights.

In this post, I’m going to walk you through a use case I recently built for River Runners, an imaginary sporting goods retailer that needs to personalize its customer experience. You’ll learn how to get fresh data out of Oracle databases to MongoDB that’s ready to power an AI application—and even more importantly, you’ll learn how to use real-time data processing to rapidly build real-time AI solutions even in organizations that aren’t cloud-first or digital-native.

Want to skip to the real-time product personalization demo?

See GitHub Demo.

Roadblock Ahead: How Legacy Systems Lead to Stale Personalization

You’ll often find the operational data you need for AI decisioning—text and unstructured data from customer interactions, customer demographics and firmographics, transaction records—stored in relational systems like Oracle.

But how do you get data out of your Oracle database and ready for modern AI systems to consume, analyze, and act on? How do you do all that quickly enough to personalize content, product recommendations, or chat interactions as the customers interact with your site or digital product?

In my River Runners demo, I did just that with Oracle XStream and MongoDB to demonstrate the practical answers to these core questions. The resulting real-time personalization engine provides a blueprint for building smart and personalized experiences with data streaming and stream processing in any organization.

Building a streaming data pipeline with Apache Kafka®, connectors, and Apache Flink® that ingests product listing data from Oracle databases for processing and generation of vector embedding before sinking product embeddings to MongoDB

Building AI-powered experiences that can actually tap into real-time data and customer insights and feel intelligent and personalized is harder than it sounds. It only becomes more challenging if the data you need lives in both legacy databases and cloud-native services such as Salesforce, Amazon S3, Snowflake, or MongoDB.

To overcome these AI roadblocks, you need the ability to instantly integrate data across these systems. At the same time, whatever AI solution you build needs all that data distilled down to contextual, trustworthy insights ready for real-time inference and decisioning.

Bridging the Gap Between Traditional and AI-Powered Personalization Systems

Today’s consumers expect relevancy, and our attention spans are constantly moving on to the next shiny thing.

We don’t care how it happens. If the digital experience you deliver misses the mark, that’s all that registers. No one wants ads for a product they searched for and bought last month or static recommendations that never change.

At many companies, personalization is still powered by batch ETL pipelines that move data hours or days after the actual event. So when a customer interacts with a product or when product details change, any recommendations or sales offers the customer receives still reflect last week’s behavior rather than a current view of their needs or preferences.

That’s because databases like Oracle were never designed for real-time ingestion, integration, or processing. But that doesn’t mean you have to replace your tech stack wholesale just to keep up with AI adoption and innovation.

The right architecture can make even a legacy database part of a modern AI decisioning engine, accelerating intelligent personalization and other use cases like real-time gaming moderation or automated social campaigns. But you have to change how you use it. That means skipping the nightly batch jobs and streaming changes as they happen. It means treating data like a live signal, not a historical artifact.

Let’s walk through my River Runners demo to explore how you can start with something as common as a relational database and implement AI use cases that work magic for your customers.

Real-Time Semantic Search for Personalization

At the core of any AI project, you should start with a simple goal that assesses its short- and long-term business impact as well as potential risks or roadblocks.

For a sporting goods store like River Runners, its customers likely want product recommendations based on the items they’ve clicked on or searched for in the past and those items’ similarity to new or existing products. In this scenario, we need semantic search capabilities that allow downstream AI systems to go beyond querying keywords or product metadata to analyze and evaluate the similarity of different products.

Building real-time pipelines with Kafka and Fink (that looks a lot like retrieval-augmented generation, or RAG) is key for building semantic search.

With RAG architectures, typically you would feed a large language model (LLM) with relevant context to generate an accurate, useful, and conversational response. But in this case, the LLM isn’t here to answer questions. Instead, we’re using generative AI to build a pipeline that automatically enriches existing product data with natural language descriptions and then generates embeddings stored in MongoDB.

Building the AI System: From Product Data in Oracle to Vector Embeddings in MongoDB

Turning operational data into real-time, personalized recommendations requires you to treat data as events streaming throughout your organization rather than static, batch data sets that sit and wait for delayed processing and cleanup.

Real-Time Data Enrichment and Embedding Pipeline With Oracle, Confluent, and MongoDB

Here are the three stages of a real-time decisioning engine built with Kafka, Flink, and LLMs:

  1. Ingest data changes from your database to Kafka via a Kafka connector.

  2. Enrich and embed streaming workloads with Flink (e.g., filter, aggregate, join) and LLMs.

  3. Sink enriched embeddings into vector databases to power downstream use cases like real-time web personalization, product recommendations, custom sales offers, and more.

Streaming Product Data Changes From Oracle

As soon as new product data is added to the Oracle database, that information can be consumed as incremental differences via change data capture. For the River Runners demo, you can see the available script for adding sample data in the data-ingestion/project directory.

Once new data is added, using a pre-built connector such as the Oracle XStream CDC Source Connector allows you to immediately capture new or updated product data in real time and write it to a Kafka topic.

Taking advantage of Oracle’s XStream Out capability to deliver high-throughput, low-latency data streams from an otherwise batch-based system, this connector makes it simple to record every product change or new customer behavior. And Kafka can serve as your AI system’s central integration hub, allowing you to connect different parts of the system without worrying about their interoperability.

Real-Time Product Enrichment and Embedding

With Flink’s stateful, in-flight processing capabilities, you can process data at the source and deliver enriched operational data directly to the LLM for additional, natural language processing and enrichment as well as embedding in preparation for your vector database.

In the River Runners demo, I’ve used an HTTP sink connector to read data from the PROD.SAMPLE.RUNNING_PRODUCTS topic. Then, incoming product records are forwarded to an enrichment service, which scrapes the product website and adds additional product details.

The enrichment service escapes additional details from the company’s ecommerce listing, while I use Flink to filter, aggregate, or join the data as needed and then embed it in-flight using OpenAI models.

Here’s how that works:

Flink Job 1: Using GPT-4 to Generate Natural Descriptions From Product Metadata

CREATE MODEL product_description_model
INPUT (
   message STRING
)
OUTPUT (
   response STRING
)
WITH (
   'provider' = 'openai',
   'task' = 'text_generation',
   'openai.connection' = 'openai-connection',
   'openai.model_version' = 'gpt-4',
   'openai.system_prompt' = 'You are a helpful AI assistant that specializes in writing product descriptions…'
);

Flink Job 2: Applying a Product Description Model to Generate Natural Language Product Summaries

INSERT INTO product_as_documents
SELECT
   CAST(CAST(ID AS STRING) AS BYTES) AS `key`,
   ID,
   product_summary.response
FROM enriched_running_products
CROSS JOIN LATERAL TABLE (
   ml_predict(
       'product_description_model',
       CONCAT_WS(
           ' ',
           'Name: ', `NAME`,
           'Rating: ', CAST(`RATINGS` AS STRING),
           'Price: ', `ACTUAL_PRICE`,
           'About: ', REGEXP_REPLACE(CAST(about_this_item AS STRING), '\\[|\\]', ''),
           'Product Description: ', product_description,
           'Product Details: ', REGEXP_REPLACE(CAST(product_details AS STRING), '\\{|\}', '')
       )
   )
) AS product_summary;

Flink Job 3: Generating Vector Embeddings From Natural Language Product Descriptions

CREATE MODEL vector_encoding
INPUT (
   input STRING
)
OUTPUT (
   vector ARRAY<FLOAT>
)
WITH (
   'task' = 'embedding',
   'provider' = 'openai',
   'openai.connection' = 'openai-embedding-connection'
);
INSERT INTO product_embeddings
SELECT
   CAST(CAST(id AS STRING) AS BYTES) AS `key`,
   embedding,
   id,
   product_summary
FROM product_as_documents,
LATERAL TABLE (
   ml_predict(
       'vector_encoding',
       product_summary
   )
) AS T(embedding);

Enabling Real-Time Information Retrieval, Analysis, and Decisioning With AI

To build modern AI systems you can trust, you need to trust the data they’re using to make decisions. All the steps leading to the River Runners demo show you exactly why Kafka and Flink are perfect for doing this:

  • When users interact with the River Runners products on the site, any click is streamed to Kafka.

  • User interactions with specific products can then trigger semantic search queries based on the relevant embedding.

  • Using Flink, the vectors representing the latest set of historical customer interactions are averaged.

  • The averaged vector is used to search the vector store so that the AI application can identify the most relevant products based on semantic search results.

By combining Kafka and Flink with LLMs as part of a RAG-like architecture, you can ensure that you’re readily integrating the data you need, enriching it with stateful stream processing and natural language processing and then making it available to use in real-time decision-making for any personalization use case.

For River Runners specifically, I’ve configured MongoDB with a vector index to enable semantic search based on the enriched and embedded product data. So once generated, embeddings of River Runners product descriptions are stored in MongoDB and ready for querying. As a result, the AI system can quickly identify semantically similar items and power real-time decisions for personalizing marketing, sales, and customer service experiences.

Why Your Foundation for AI Systems Matters

Your recommendation systems and AI projects don’t have to run on delay. Instead of stale data, you can feed them real-time, contextual streams of the most up-to-date view of each customer and product.

With the architecture in my demo, as soon as customers interact with a River Runners product, that behavior becomes an event captured in Kafka. The value of that information isn’t just lost to time and left to wait for batch systems to catch up. Instead, it’s put to use immediately and will set off semantic search queries defined by the embedding stored in MongoDB for the product of interest.

Then, trends in the customer’s historical behavior and product interactions can be used to search the vector store to identify and recommend the most relevant products. And the product storefront can track future interactions and display the continuously, incrementally changing views of the product catalog tailored to each customer—all without the manual break-fix cycle of batch jobs slowing you down.

Explore the demo on GitHub to see how the system I’ve built works and try other personalization scenarios, such as:

  • Using real-time user inputs on individual product pages to tailor recommendations (e.g., “Not interested” or “Show me more” buttons)

  • Building simple user profiles to further refine recommendations

  • Tracking more complex web behaviors like pages per session, pathways, and traffic sources

  • Building AI agents for customer support, personalized pricing alerts, or other capabilities

Once you’re streaming changes to operational data, processing them in real time, and searching by meaning, there are tons of use cases to consider and try.

Sign up for a free trial of Confluent Cloud and try setting up the demo yourself. And with the promo code CCBLOG60, you’ll get an additional $60 worth of free Confluent Cloud usage.*

‎ 

Oracle® and XStream are either registered or unregistered trademarks of Oracle and/or its affiliates. Other names may be trademarks of their respective owners.

Apache®, Apache Kafka®, Kafka®, Apache Flink®, and Flink® are registered trademarks of the Apache Software foundation.

  • Sean is a Senior Director, Product Management - AI Strategy at Confluent where he works on AI strategy and thought leadership. Sean's been an academic, startup founder, and Googler. He has published works covering a wide range of topics from AI to quantum computing. Sean also hosts the popular engineering podcasts Software Engineering Daily and Software Huddle.

Did you like this blog post? Share it now