[Virtual Event] Agentic AI Streamposium: Learn to Build Real-Time AI Agents & Apps | Register

Confluent Intelligence の新機能:A2A、多変量解析による異常検知、Cosmos DB 向けベクトル検索、Amazon S3 Vectors など

作成者 :

AI のコモディティ化が進む中で、企業にとってのバリュードライバーは、もはや「どの大規模言語モデル(LLM)を使うか」ではなく、「自社データを活用し、いかにAI によるリアルタイムで信頼性の高い意思決定を実現するか」へと移りつつあります。AI エージェントシステムでは、エージェントが自律的に計画・判断・行動を行います。しかし、こうしたシステムの有用性は、エージェントがどれだけ適切で十分なコンテキストを持てるかに大きく左右されます。コンテキストが古い、分断されている、あるいはポイント間の統合が脆弱で安定して利用できない状態であれば、たとえ高性能なモデルを採用していたとしても、その価値を十分に引き出すことはできません。

Confluent Intelligenceは、この問題を解決するために構築されました。Confluent Intelligence は Confluent Cloud 上で提供されるフルマネージドサービスで、Apache Kafka® と Apache Flink® を統合したデータストリーミングプラットフォームで、リアルタイム、コンテキストリッチ、高信頼性といった特長のある AI システムを構築できます。Confluent Intelligence を使うことで、インフラをつなぎ合わせることなく、業務のイベントをストリーミング処理できるほか、外部データを継続的に取り込み拡充させられます。また、組み込み ML を適用し、Streaming Agents や AI アプリケーションに常に最新のコンテキストを提供することができます。

Confluent Intelligence が機能強化され、Streaming Agents、組み込みの ML 機能、Model Context Protocol(MCP)のサポートといった新機能が追加されました。これらにより、すでに利用しているエージェントの再利用、最新データによる複雑な異常検知、検索拡張生成(RAG)ワークフローへの多くのベクトルストアの取り込み、ネットワークのセキュリティの強化、エージェントが Confluent Cloud のリアルタイムデータへアクセスする方法の標準化が可能になります。

2026 年第 1 四半期:Confluent Intelligence の新機能

Confluent Intelligence に新機能が追加されました。

  • Streaming Agents 向けの Agent2Agent(A2A)連携(オープンプレビュー): Streaming Agents が、LangChain、CrewAI、SAP、Salesforce などの A2A 対応プラットフォーム上の外部エージェントと連携し、タスクをコラボレーションおよびオーケストレーションできるようになりました。これらはすべて、信頼性高くイベントデータを再生可能な Kafka 上で実行されます。

  • 組み込み ML 機能向けの多変量解析異常検知(アーリーアクセス):複数の相関関係のある指標を同時に分析し、ノイズによるスパイクではなく、本当に問題となる異常を検知できます

  • Azure Cosmos DB と Amazon S3 Vectors 向けベクトル検索(一般提供):Flink.Azure Cosmos DB と Amazon S3 Vectors を、MongoDB、Pinecone、Elastic、PostgreSQL、SQL Server、Oracle などすでに利用している同じ外部テーブルや検索ファブリックに統合できます。これにより、Flink で構築するリアルタイム RAG(検索拡張生成)パイプラインをさらに拡張できます。

  • モデル推論、外部テーブル、検索向けの Amazon Web Services(AWS)および Azure のプライベートリンク(一般提供):AI トラフィックをパブリックインターネットに公開することなく、Flink から外部データベース、ベクトルストア、REST API へ VPC 間のプライベート接続を利用して、モデルの呼び出しや、機密データを使ったリアルタイムストリームのデータ拡充を安全に行うことができます。

  • Confluent Cloud 向けオープンソース MCP サーバーに対する Confluent のサポート:.あらゆる MCP クライアントに対して、統制されたリアルタイムデータを提供するオープンソースの MCP サーバーについて、Confluent の正式なサポートが提供されるようになりました。

Confluent Intelligenceのデモ:A2A統合と多変量異常検知の解説

それでは、これらの機能をそれぞれ詳しく見ていきましょう。

A2A 統合:既存のエージェントの再利用と接続

企業が CRM、データウェアハウス、業務システム、独自アプリケーションなどで AI エージェントを導入していくと、「エージェント孤島」が生まれることが多々あります。各エージェントは単体で強力ですが、このような環境では、他のエージェントの検出、相互通信、作業の連携や調整は難しくなります。現在、こうしたやり取りの多くは、ポイント間が HTTP 通信で接続されています。しかしこの方法は、マルチエージェントシステムが拡大するにつれて、観測、監査、拡張が難しいといった課題が生まれます。

Streaming Agents 向け A2A 連携により、A2A オープンプロトコルが Flink に直接組み込まれます。これにより、イベント駆動型のエージェントは、Kafka 上のリプレイ可能なイベントストリームを介して、他の A2A 対応プラットフォーム(例:LangChain、SAP、Salesforce)のエージェントと接続し、作業をオーケストレーションし、協調してタスクを実行できるようになります。既存のエージェントやフレームワークへの投資はそのまま活かしながら、AI エージェントをイベント連動型にすることができ、これまでのバッチスナップショットによる古いデータに頼るのではなく、常に最新のビジネス状態に基づいたリアルタイムの判断が可能になります。 詳細については、こちらのドキュメントを参照してください。

Streaming Agents can collaborate with and orchestrate any A2A-capable agent.

ストリーミング A2A のユースケース:イベント駆動型ワークフローのオーケストレーション

Confluent、ServiceNow、Salesforce、そして独自の復旧エージェント全体にわたって、テレコムのインシデント管理のユースケースについて考えてみましょう

  1. Streaming Agent は、優先度の高いアラート(例:特定地域での通話切断の急増)を検知するために、Kafka のテレメトリトピックを監視します。

  2. ネットワーク異常が検知されると、Streaming Agent は A2A を使って以下の処理を実行します。

    1. ServiceNow のインシデントトリアージエージェントを起動します。最近のログ、設定変更、影響を受ける 4G/5G 基地局の情報を追加してチケットを拡充します。

    2. Salesforce エージェントを呼び出し、影響を受けた法人および個人契約者に関する最新のアカウント情報や SLA(サービスレベル契約)といったコンテキストを取得します。 

  3. 各外部エージェントが作業を完了すると、結果は Kafka に書き戻され、復旧エージェントにストリーミングされます。復旧エージェントは、トラフィックの迂回ルート設定、各対象顧客への通知送信、人による対応へのエスカレーションを判断します。

このリアルタイムのイベント駆動型ワークフローは、Confluent Intelligence と A2A を通じて接続されているため、複数エージェント間のやり取り全体を不変のログとして再生でき、すべての意思決定を検査し、新しいロジックを本番環境に影響を与えずに安全に実行して改善できます。

多変量解析異常検知:簡易なチェックでは見逃すことが多い異常を検知

単一の指標がしきい値を超える状況だけでは、実環境のシステムが故障する原因になることはほとんどありません。例えば、通常のクレジットカード取引も、新しいデバイスの利用、通常とは異なる場所、複数のクレジットカード口座にわたる異常なスピードでの利用と組み合わさることで初めて不審な取引として判断されます。従来の単一指標の異常検知では、こうした相関関係は見逃されることが多く、ノイズや外れ値によって基準値が歪められ、チームが誤検知に振り回されることもあります。

組み込み ML 機能向けの多変量解析異常検知は、複数の指標を 1 つのベクトルとして扱い、確かな統計手法を用いて、指標を組み合わせた振る舞いが異常かどうかを検知します。これにより、さまざまなデータの組み合わせにまたがる異常を特定することができ、単一のシグナルだけでは対応できない複雑なシナリオにも対応できます。その結果、ノイズによるアラートを最小化し、顧客やビジネスにとって本当に重要な問題を迅速に検知できます。

この機能は Flink SQL の新しい ML_DETECT_ANOMALIES_ROBUST 関数を使用しています。アーリーアクセスには、こちらから登録してください。詳細については、こちらのドキュメントを参照してください。

Multivariate Anomaly Detection reduces noise and improves accuracy.

多変量解析異常検知のユースケース:生産設備の予知保全

ポンプ設備の予防保全について考えてみましょう。故障は単一の計測値だけでは説明がつかない場合が多く、温度、圧力、振動を組み合わせて監視することで、トラブルの兆候を正確に検出できます。

Confluent Intelligence の多変量解析異常検知を使うと、以下が可能になります。

  1. 異なる工場のセンサーデータを Kafka トピックにストリーミングします。

  2. ROW(温度、圧力、振動)のようなベクトルに対して Flink SQL の ML_DETECT_ANOMALIES_ROBUST を使い、時間経過における正常な振る舞いをモデル化します。

  3. 各指標単体が正常な範囲内でも、指標の組み合わせが異常な場合、イベントにフラグを立てます。

  4. これらの異常なイベントをイベント駆動型の Streaming Agent に提供し、メンテナンス履歴でデータ拡張したうえで、資産管理システム内の作業指示書を自動的に作成または優先付けすることができます。

同じパターンは不正検知にも応用できます。例えば、取引金額、加盟店カテゴリ、デバイスフィンガープリント、位置情報をベクトルとしてモデル化することが可能です。また、e-コマースの場合も、PV数、カート放棄率、チェックアウトの遅延などを組み合わせて監視することで、単なる一時的な変動ではなく、実際のコンバージョンの問題を検知できます。

Cosmos DB および Amazon S3 Vectors 向けベクトル検索:RAG スタックの強化

現在のビジネスでは、データが業務データベース、分析システム、複数のベクトルストアに分散しています。業務イベントは Kafka にストリームされる一方、ベクトルで表現したデータの特徴は Cosmos DB や S3 Vectors に存在することも多く、チームは脆弱なそれらのポイント間接続を駆使して、データ取り込み、埋め込み、ベクトル検索、推論をつなぎ合わせる必要がありました。

Flink ベクトル検索が Cosmos DB と S3 Vectors をサポート

Azure Cosmos DB および Amazon S3 Vectors 上での Flink ベクトル検索のサポートにより、これらのシステムを Confluent の外部テーブルおよび検索ファブリックにおけるネイティブ統合されたベクトルプロバイダーとして扱えるようになります。これにより、MongoDB、Elastic、Couchbase、Pinecone などと同様に、同一の枠組みの中で利用できるようになります。

このリリースにより、次のことが可能になります。

  • Flink SQL から Cosmos DB や S3 Vectors を直接クエリし、LLM に入力する前段で  近傍(kNN)検索を実行して、関連するコンテキストをリアルタイムに取得できます。

  • 複数のマイクロサービスを連動させるのではなく、単一のストリーミングパイプラインを実行し、データ取り込み、変換、埋め込み作成、ベクトル検索、モデル推論を処理できます。

  • Streaming Agents や RAG アプリケーションを関連コンテキストに基づいて動作させ、ハルシネーションを抑え、回答品質を向上します。

  • 外部テーブルと検索を使って、ベクトルをその場でクエリすることで、余計な業務ストアにデータを複製する必要性を回避します。

  • サポートされているすべてのベクトルデータベースで、同じ Flink と Streaming Agents のパターンを使用することで、将来的な拡張にも対応します。

Amazon S3 Vectors を使うと、低コストで耐久性の高いベクトルストレージも利用できます。これは S3 データレイクや Amazon Bedrock などの AWS のサービスと簡単に統合でき、Streaming Agents が同じ Flink ジョブの中で、S3 から取得したセマンティックコンテキストと Bedrock モデルを組み合わせて利用できるようになります。

詳細については、こちらのドキュメントを参照してください。

ベクトル検索のユースケース:最新のコンテキストを使ったリアルタイム顧客サポート

例えば、カスタマーサポートエージェントを構築しているとします。このエージェントは、プロンプト生成時に最も関連性の高いコンテキストを、ナレッジベースの記事や、Cosmos DB や S3 Vectors に保存されている最近の通話記録から取得する必要があります。

Confluent Intelligence を使うと、以下が可能になります。

  • Streaming Agent が Kafka トピックを監視し、サポートチケットやユーザーイベントの受信を検知します。

  • 各チケットに対して、Flink SQL が Cosmos DB または S3 Vectors に対してベクトル検索を実行し、意味的に最も関連性の高いコンテンツを取得します

  • Streaming Agent は、これらのベクトル検索結果をリアルタイムストリーム(例:ロイヤルティステータス、最近の閲覧・カート活動、注文/返金履歴など)と組み合わせ、LLM を呼び出して、高度にパーソナライズされたレコメンデーションを生成します。

これらすべてが同じ Flink のストリーム処理パイプライン上で実行されるため、AI とデータ処理のワークフローを統合できます。また、個別のデータ取り込みジョブや専用の RAG サービス、ベクトルデータベースとデータソース間のアドホックな同期などによる追加コスト、およびこういった複雑な運用も回避できます。

モデル推論、外部テーブル、ベクトル検索向けの AWS と Azure のプライベートリンク:AI のための安全なネットワーク

多くのチームにとって、AI ワークフローを機密性の高い記録のシステムに接続する際の最後の課題は、API やスキーマではなくネットワークです。

モデル推論、外部テーブル、ベクトル検索に AWS と Azure のプライベートリンクを使うことで、Flink から外部モデルやシステムへ VPC 間のプライベート接続が可能になります。これにより Confluent Intelligence(Streaming Agents を含む) は、LLM を呼び出したり、リアルタイムストリームを外部データで強化したりできます。具体的には、外部データベース、ベクトルストア、REST エンドポイントにある機密性の高い企業の固有データを、パブリックインターネットを経由することなく利用できます。多くの企業は、これにより本番環境への導入が進めやすくなり、セキュリティやコンプライアンスへの適合に役立ちます。

プライベートリンクを利用すると、以下が可能になります。

  • AI トラフィックをプライベートで処理してコンプライアンス要件に適合:プライベートリンクを使用することで、CRM、ERP(企業資源計画)、ベクトルストア、REST API への参照が、パブリックインターネットではなく プライベートネットワーク経由で行われます。

  • Kafka ストリームと記録システムのデータを安全に結合:Streaming Agents に、顧客情報、注文、ポリシーなどの最新かつ完全なコンテキストを提供しながら、厳格なセキュリティ要件も満たすことができます。

  • ネットワーク構成の複雑さを削減:アドホックなプロキシやトンネルの代わりに、Confluent が提供する標準化されたクラウドネイティブなプライベート接続を利用できます(AnthropicのMCPを採用する企業は、オープンソースのConfluent MCPサーバーを利用して、AIエージェントがConfluent Cloud(Kafka、Flink、コネクタ、Tableflow、Schema Registryなど)に接続するための、シンプルで標準化された方法が必要なのです。これまではほぼ独力で対応する必要がありました。MCPサーバーを自らデプロイ・運用し、各MCPクライアント(Claude Desktop、Goose、Gemini CLIなど)に接続し、ベンダーによる正式なサポートなしに問題を処理してきたのです。

詳細については、こちらのドキュメントを参照してください。

Confluent Cloud でのオープンソース MCP サーバーのサポート:あらゆる MCP クライアントに対応した本番レベルのサポート

Anthropic の MCP(Model Context Protocol)を採用するチームは、AI エージェントが Confluent Cloud(Kafka、Flink、コネクタ、Tableflow、Schema Registry など)へアクセスするための、シンプルで標準化された方法を求めており、オープンソースの Confluent MCP サーバーを利用しています。しかしこれまでは、多くのチームがMCP サーバーのデプロイと運用や各 MCP クライアント(例:Claude Desktop、Goose、Gemini CLI)の接続設定を自ら行い、正式なベンダーサポートなしで問題に対応する必要がありました。

現在、Confluent は Confluent Cloud でオープンソース MCP サーバーを正式にサポートするようになりました。これにより、MCP ベースのエージェントと Confluent 上のリアルタイムでガバナンスされたデータの接続部分を、本番環境でも安心して使える公式サポート付きの仕組みとして利用できるようになりました。

このサポートにより、以下が可能になります。

  • MCP エージェントが Confluent Cloud の最新データと操作に直接アクセスします。Kafka トピックの管理だけでなく、コネクタ、Flink、Tableflow に対応し、請求、メトリクス API などもサポートします。こにより、バックプレッシャーなどの問題のデバッグにも利用できます。

  • 自然言語で Confluent を管理します。コードを書くことなく、MCP クライアントからトピックの設定、Flink SQL の実行、データインフラストラクチャの操作が可能になります。

  • GitHub やアカウントチームを通じて問題を報告でき、Confluent のエンジニアが定義された SLA に基づいて解決に取り組みます。

Confluent Intelligence でリアルタイムかつコンテキスト豊富な AI を構築する

AI の可能性を最大限に活かすためには、ビジネスの最新状態をリアルタイムで把握し、理解し、行動できる AI エージェントやアプリケーションが必要です。Confluent Intelligence は、ストリーミングとストリーム処理を統合し、コンテキストエンジニアリング、ML、RAG のためのベクトル検索、ストリーミングエージェント、MCPや A2A などのオープンプロトコルを統合し、ストリーミングとストリーム処理を組み合わせることで、フルマネージドでガバナンスの利いたプラットフォーム上でこれを実現します。

Confluent Intelligence を使って、AI の取り組みをリアルタイムの本番システムへと進化させましょう。


Apache®、Apache Kafka®、Apache Flink®、Flink® およびそれぞれのロゴは、米国およびその他の国における Apache Software Foundation の商標です。これらの商標の使用は、Apache Software Foundation による承認や推奨を意味するものではありません。その他のすべての商標は、それぞれの所有者に帰属します。

  • このブログは複数のConfluent社員による共同作業で作成されました。

このブログ記事は気に入りましたか?今すぐ共有