Master Kafka, Flink & Tableflow in 5 Days: Join the Data Streaming Grand Prix | Register Now
Playlists for every mood and occasion. Media recommendations grouped by the most niche theme from your watch history. Sophisticated ad algorithms that optimize pay-per-click ads for the customer experience.
Whether you call them digital-native, disruptors, or just tech giants, the likes of Spotify, Netflix, and Amazon have long made uncannily personal experiences a key part of their differentiation or business models.
When you build these kinds of experiences, you make customers feel like your business really knows them and what they need—and makes them more likely to trust your recommendations.
Behind every bit of personalization the user sees on the frontend, there are dozens or even hundreds of decisions being made on the backend. Just a few years ago—even at their most sophisticated—these kinds of marketing, product, or sales decisions might be made with simple A/B testing and causal inference. But now they can be made even faster and smarter with artificial intelligence (AI).
That is, if you have the right foundation to readily serve legacy data to new AI engines and act on the resulting insights.
In this post, I’m going to walk you through a use case I recently built for River Runners, an imaginary sporting goods retailer that needs to personalize its customer experience. You’ll learn how to get fresh data out of Oracle databases to MongoDB that’s ready to power an AI application—and even more importantly, you’ll learn how to use real-time data processing to rapidly build real-time AI solutions even in organizations that aren’t cloud-first or digital-native.
Want to skip to the real-time product personalization demo?
You’ll often find the operational data you need for AI decisioning—text and unstructured data from customer interactions, customer demographics and firmographics, transaction records—stored in relational systems like Oracle.
But how do you get data out of your Oracle database and ready for modern AI systems to consume, analyze, and act on? How do you do all that quickly enough to personalize content, product recommendations, or chat interactions as the customers interact with your site or digital product?
In my River Runners demo, I did just that with Oracle XStream and MongoDB to demonstrate the practical answers to these core questions. The resulting real-time personalization engine provides a blueprint for building smart and personalized experiences with data streaming and stream processing in any organization.
Building AI-powered experiences that can actually tap into real-time data and customer insights and feel intelligent and personalized is harder than it sounds. It only becomes more challenging if the data you need lives in both legacy databases and cloud-native services such as Salesforce, Amazon S3, Snowflake, or MongoDB.
To overcome these AI roadblocks, you need the ability to instantly integrate data across these systems. At the same time, whatever AI solution you build needs all that data distilled down to contextual, trustworthy insights ready for real-time inference and decisioning.
Today’s consumers expect relevancy, and our attention spans are constantly moving on to the next shiny thing.
We don’t care how it happens. If the digital experience you deliver misses the mark, that’s all that registers. No one wants ads for a product they searched for and bought last month or static recommendations that never change.
At many companies, personalization is still powered by batch ETL pipelines that move data hours or days after the actual event. So when a customer interacts with a product or when product details change, any recommendations or sales offers the customer receives still reflect last week’s behavior rather than a current view of their needs or preferences.
That’s because databases like Oracle were never designed for real-time ingestion, integration, or processing. But that doesn’t mean you have to replace your tech stack wholesale just to keep up with AI adoption and innovation.
The right architecture can make even a legacy database part of a modern AI decisioning engine, accelerating intelligent personalization and other use cases like real-time gaming moderation or automated social campaigns. But you have to change how you use it. That means skipping the nightly batch jobs and streaming changes as they happen. It means treating data like a live signal, not a historical artifact.
Let’s walk through my River Runners demo to explore how you can start with something as common as a relational database and implement AI use cases that work magic for your customers.
At the core of any AI project, you should start with a simple goal that assesses its short- and long-term business impact as well as potential risks or roadblocks.
For a sporting goods store like River Runners, its customers likely want product recommendations based on the items they’ve clicked on or searched for in the past and those items’ similarity to new or existing products. In this scenario, we need semantic search capabilities that allow downstream AI systems to go beyond querying keywords or product metadata to analyze and evaluate the similarity of different products.
Building real-time pipelines with Kafka and Fink (that looks a lot like retrieval-augmented generation, or RAG) is key for building semantic search.
With RAG architectures, typically you would feed a large language model (LLM) with relevant context to generate an accurate, useful, and conversational response. But in this case, the LLM isn’t here to answer questions. Instead, we’re using generative AI to build a pipeline that automatically enriches existing product data with natural language descriptions and then generates embeddings stored in MongoDB.
Turning operational data into real-time, personalized recommendations requires you to treat data as events streaming throughout your organization rather than static, batch data sets that sit and wait for delayed processing and cleanup.
Here are the three stages of a real-time decisioning engine built with Kafka, Flink, and LLMs:
Ingest data changes from your database to Kafka via a Kafka connector.
Enrich and embed streaming workloads with Flink (e.g., filter, aggregate, join) and LLMs.
Sink enriched embeddings into vector databases to power downstream use cases like real-time web personalization, product recommendations, custom sales offers, and more.
As soon as new product data is added to the Oracle database, that information can be consumed as incremental differences via change data capture. For the River Runners demo, you can see the available script for adding sample data in the data-ingestion/project directory.
Once new data is added, using a pre-built connector such as the Oracle XStream CDC Source Connector allows you to immediately capture new or updated product data in real time and write it to a Kafka topic.
Taking advantage of Oracle’s XStream Out capability to deliver high-throughput, low-latency data streams from an otherwise batch-based system, this connector makes it simple to record every product change or new customer behavior. And Kafka can serve as your AI system’s central integration hub, allowing you to connect different parts of the system without worrying about their interoperability.
With Flink’s stateful, in-flight processing capabilities, you can process data at the source and deliver enriched operational data directly to the LLM for additional, natural language processing and enrichment as well as embedding in preparation for your vector database.
In the River Runners demo, I’ve used an HTTP sink connector to read data from the PROD.SAMPLE.RUNNING_PRODUCTS
topic. Then, incoming product records are forwarded to an enrichment service, which scrapes the product website and adds additional product details.
The enrichment service escapes additional details from the company’s ecommerce listing, while I use Flink to filter, aggregate, or join the data as needed and then embed it in-flight using OpenAI models.
Here’s how that works:
To build modern AI systems you can trust, you need to trust the data they’re using to make decisions. All the steps leading to the River Runners demo show you exactly why Kafka and Flink are perfect for doing this:
When users interact with the River Runners products on the site, any click is streamed to Kafka.
User interactions with specific products can then trigger semantic search queries based on the relevant embedding.
Using Flink, the vectors representing the latest set of historical customer interactions are averaged.
The averaged vector is used to search the vector store so that the AI application can identify the most relevant products based on semantic search results.
By combining Kafka and Flink with LLMs as part of a RAG-like architecture, you can ensure that you’re readily integrating the data you need, enriching it with stateful stream processing and natural language processing and then making it available to use in real-time decision-making for any personalization use case.
For River Runners specifically, I’ve configured MongoDB with a vector index to enable semantic search based on the enriched and embedded product data. So once generated, embeddings of River Runners product descriptions are stored in MongoDB and ready for querying. As a result, the AI system can quickly identify semantically similar items and power real-time decisions for personalizing marketing, sales, and customer service experiences.
Your recommendation systems and AI projects don’t have to run on delay. Instead of stale data, you can feed them real-time, contextual streams of the most up-to-date view of each customer and product.
With the architecture in my demo, as soon as customers interact with a River Runners product, that behavior becomes an event captured in Kafka. The value of that information isn’t just lost to time and left to wait for batch systems to catch up. Instead, it’s put to use immediately and will set off semantic search queries defined by the embedding stored in MongoDB for the product of interest.
Then, trends in the customer’s historical behavior and product interactions can be used to search the vector store to identify and recommend the most relevant products. And the product storefront can track future interactions and display the continuously, incrementally changing views of the product catalog tailored to each customer—all without the manual break-fix cycle of batch jobs slowing you down.
Explore the demo on GitHub to see how the system I’ve built works and try other personalization scenarios, such as:
Using real-time user inputs on individual product pages to tailor recommendations (e.g., “Not interested” or “Show me more” buttons)
Building simple user profiles to further refine recommendations
Tracking more complex web behaviors like pages per session, pathways, and traffic sources
Building AI agents for customer support, personalized pricing alerts, or other capabilities
Once you’re streaming changes to operational data, processing them in real time, and searching by meaning, there are tons of use cases to consider and try.
Sign up for a free trial of Confluent Cloud and try setting up the demo yourself. And with the promo code CCBLOG60, you’ll get an additional $60 worth of free Confluent Cloud usage.*
Oracle® and XStream are either registered or unregistered trademarks of Oracle and/or its affiliates. Other names may be trademarks of their respective owners.
Apache®, Apache Kafka®, Kafka®, Apache Flink®, and Flink® are registered trademarks of the Apache Software foundation.
Confluent has introduced Private Network Interface (PNI) for AWS, a new secure networking option that helps save 50% on networking costs and has been successfully adopted by customers like Indeed. PNI is now available for Enterprise and Freight clusters.
Introducing NeuBird’s Hawkeye—a GenAI-powered SRE assistant purpose-built to streamline these operational demands. By intelligently ingesting telemetry, alert, and audit data from Confluent Cloud and automating diagnostic workflows, Hawkeye helps teams reduce MTTR and accelerate root cause analysis.