Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Amazon OpenSearch Ingestion Adds Support for Confluent Cloud as Source

Written By

Until recently, customers didn't have an easy way to send data from Confluent’s data streaming platform to Amazon OpenSearch. They had to either write custom code using AWS Lambda as an intermediary, refactor the HTTP Sink connector, or self-manage an old Elasticsearch connector version. Earlier this year, we announced the fully managed OpenSearch Sink connector, providing a seamless way to sink data from Confluent to Amazon OpenSearch. Furthermore, for customers using Confluent Cloud with PrivateLink connectivity and wanting to connect to a private OpenSearch domain or integrate a Confluent Cloud cluster with an OpenSearch Serverless collection, we are proud to announce that Amazon OpenSearch Ingestion now supports Confluent as a source, giving customers another option to integrate both platforms together. 

OpenSearch Ingestion is a fully managed, serverless feature provided by Amazon OpenSearch that allows users to ingest real-time log, metric, and trace data to Amazon OpenSearch Service domains and OpenSearch Serverless collections. Users can use OpenSearch Ingestion to filter, enrich, and transform the data before delivering it to the target OpenSearch domain or collection. 

In this post, we dive deep into how this feature works, its benefits, and how to configure it. We will integrate a Confluent Cloud Enterprise cluster (backed by PrivateLink) with a private OpenSearch domain. Before doing this, let's explore how the connector works and how it allows Amazon Web Services (AWS) customers to connect a third-party private resource (Confluent) with a private native service (OpenSearch).

How does OpenSearch Ingestion work?

Networking

OpenSearch Ingestion leverages PrivateLink to connect to other private AWS Sources. One of the primary characteristics of PrivateLink is that traffic is unidirectional—it is always initiated  from the consumer VPC. Consequently, OpenSearch Ingestion was not able to connect to private third-party data sources. To fix this, OpenSearch Ingestion introduced support for Elastic Network Interfaces (xENIs). 

In this mode, OpenSearch ingestion operates within AWS-managed VPCs and utilizes xENIs in the customer VPC to connect to third-party data sources. This is a powerful mechanism because it allows OpenSearch Ingestion to connect to any service or platform that a customer's homegrown application running in the customer VPC can access. To understand the problem this method addresses, let's look at how it facilitates the connection between private third-party services like Confluent Cloud Enterprise clusters and private first-party services like an OpenSearch domain.

In a private OpenSearch domain, data nodes are deployed in an AWS-managed VPC, and OpenSearch deploys network interfaces in the customer VPC. Any customer application connects with OpenSearch via these network interfaces. When you introduce a third-party private resource like Confluent Cloud Enterprise clusters, these clusters operate in a separate VPC managed by the ISV (in this case, Confluent). Enterprise clusters use PrivateLink for networking connectivity, which means that communication must be initiated from the customer VPC due to its unidirectional nature. The only way to connect these two private resources is by leveraging xENIs deployed in the customer VPC. This is precisely what OpenSearch Ingestion does, as depicted in the following diagram:

As long as the customer VPC used has access to the OpenSearch domain and the private Confluent Cloud cluster, then OpenSearch Ingestion will be able to connect both systems together.

Authentication and access control

  • (SOURCE) Connecting to Confluent Cloud: OpenSearch Ingestion supports the SASL/PLAIN authentication mechanism to connect to Confluent Cloud. It uses API keys, provisioned in Confluent Cloud, for this connection. These API keys must be associated with a service account that has the necessary permissions (ACLs and RBAC roles) to read from Confluent Cloud.

  • (Destination) Connecting to OpenSearch: To index data into an OpenSearch domain or collection, the OpenSearch Ingestion pipeline IAM role must have the necessary permissions.

    • If the destination is an OpenSearch domain, refer to this link to grant the necessary permissions to the role and to map the pipeline role to the OpenSearch all_access backend role.

    • If the destination is an OpenSearch Serverless collection, follow this link to AWS documentation.

Scaling

OpenSearch Ingestion capacity is measured in OpenSearch Capacity Units (OCUs). Each OCU provides approximately 8 GB of memory and 2 vCPUs. When configuring an OpenSearch pipeline, users specify the minimum and maximum number of OCU values for a pipeline. Subsequently, OpenSearch Ingestion automatically scales the pipeline capacity based on these limits. Thus, providing hands-free capacity management. Customers are only charged by the number of Ingestion OCUs used by the pipeline.

When using Confluent as a source, partitions in Confluent are distributed across the number of OCUs running. Each partition is assigned to only one OCU consumer. Consequently, a pipeline will not have more OCUs than the number of source partitions at any given time. If a pipeline has multiple topics, users can configure the maximum compute units based on the topic with the highest number of partitions.

Configuring OpenSearch Ingestion with Confluent Cloud

High-level data pipeline

The following diagram shows the flow from data sources through Confluent Cloud to Amazon OpenSearch Service.

The flow contains the following steps:

  1. Data sources produce data and send that data to Confluent Cloud.

  2. OpenSearch Ingestion consumes the data from Confluent Cloud.

  3. OpenSearch Ingestion transforms, enriches, and writes the data into OpenSearch Service.

  4. Users search, explore, and analyze the data with OpenSearch Dashboards.

Prerequisites

Below are the requirements needed for the following walk-through:

  • Confluent Cloud cluster up and running with data flowing into a topic. If you do not have this, checkout this GitHub repo to get started.

Confluent Cloud supports four different networking connectivity methods: AWS PrivateLink, Transit Gateway, VPC peering, and public clusters. In this blog, we will focus on PrivateLink. As we proceed, we will also explain how the configuration varies depending on the networking mechanism used by the Confluent Cloud cluster.

  • OpenSearch cluster up and running. OpenSearch supports two deployment types: provisioned and serverless. In this blog, we will focus on the provisioned deployment type.

  • Confluent Cloud API key and secret that has read access to Confluent topics. Follow the documentation to create API keys.

  • IAM role with permissions to write data to OpenSearch domains. If the OpenSearch domain uses fine-grained access control for authentication, map the pipeline role to the OpenSearch “all_access” backend role.

Creating the pipeline

OpenSearch Ingestion uses AWS Secrets Manager to store Confluent API keys and secrets needed to access Confluent Cloud clusters. Run the following AWS command to store these credentials in Secrets Manager:

‘’’
aws secretsmanager create-secret \
    --name <Confluent_Secret_Name> \ 
    --secret-string '{"USERNAME":"<your-confluent-key>", "password":"<your-confluent-password>"}'
‘’’

Now we can start to create the OpenSearch Ingestion pipeline. Follow the steps below:

  1. In the OpenSearch Ingestion pipeline console, click Create pipeline.

  2. Choose the Confluent blueprint and click Select blueprint.

  3. You will see that you have a sample pipeline configuration that could be tweaked to work with your Confluent Cloud cluster:

    ‘’’
    version: "2"
    kafka-pipeline:
        source:
           kafka:
              topics:
               - name: "<INPUT_TOPIC_NAME>"
                 group_id: "osi-cwc"
             bootstrap_servers:
               - "<CONFLUENT_BOOTSTRAP_URL>"
             authentication:
               sasl:
                 plain:
                   username: "${{aws_secrets:confluent-kafka-secret:username}}"
                   password: "${{aws_secrets:confluent-kafka-secret:password}}"
        sink:
           - opensearch:
               hosts: [ "<AMAZON_OPENSEARCH_ENDPOINT>" ]
               index: "<AMAZON_OPENSEARCH_INDEX_NAME>"
               aws:
                 sts_role_arn: "<OPENSEARCH_INGESTION_ROLE>"
                 region: "<AWS_REGION>"
    extension:
       aws:
         secrets:
           confluent-kafka-secret:
             secret_id: "<Confluent_Secret_Name>"
             region: "<AWS_REGION>"
             sts_role_arn: "<OPENSEARCH_INGESTION_ROLE>"
    
    ‘’’

    Fill in the parameters according to your use case:

    • <INPUT_TOPIC_NAME>: Topic name with input data that you want to sink to OpenSearch.

    • <CONFLUENT_BOOTSTRAP_URL>: Confluent bootstrap url that could be retrieved from Confluent Cloud console. If Confluent Cloud cluster is PrivateLink make sure you set up a Route53 Private Hosted Zone and associate it with the VPC where OpenSearch Ingestion ENIs will be created. This step is needed for DNS resolution. You should have a DNS record that maps the bootstrap url to the VPC endpoint. Below is an example of our private hosted zone:

    • <AMAZON_OPENSEARCH_ENDPOINT>: OpenSearch Endpoint could be retrieved from OpenSearch Console.

    • <OPENSEARCH_INGESTION_ROLE>: OpenSearch Ingestion pipeline IAM role. The role needs to have the following permissions:

      • Access to AWS Secrets Manager to retrieve Confluent credentials

      • Write access to OpenSearch domain

      • CRUD resources inside your VPC

      • Write logs to CloudWatch logs

    • <Confluent_Secret_Name>: The secret created in the first step.

  4. In the Network section, choose whether the source Confluent cluster has private or public endpoints. For VPC peering, Transit Gateway, and PrivateLink clusters, choose VPC access and for public clusters choose Public access.

    For VPC peered clusters and Transit Gateway clusters, make sure you select a VPC that is connected to the Confluent Cloud network. This VPC CIDR cannot overlap with the Confluent Cloud network CIDRs.

    However, because our cluster is an Enterprise (PrivateLink) cluster, there are no CIDR constraints with Confluent. Here we choose a VPC that has access to the VPC endpoints connected to the PrivateLink attachment of the Enterprise cluster. This VPC should also be associated with the Route53 Private hosted zone that contains a record mapped to the VPC endpoint

    Additionally, for PrivateLink connectivity, we need to  configure VPC DHCP Options as follows:

    • domain-name: aws.private.confluent.cloud

    • domain-name-servers: AmazonProvidedDNS

  5. Check the Attach to VPC option. When deploying an OpenSearch Ingestion pipeline with private access, as described in the previous step, OpenSearch Ingestion deploys two sets of ENIs. The first set is deployed in the customer VPC (the VPC selected above), and the second set is deployed in an AWS-managed VPC. This option is required when connecting OpenSearch Ingestion to any third-party private source. By selecting this option, you provide the CIDR range that OpenSearch Ingestion should use to deploy the ENIs in the AWS-managed VPC. This CIDR range must not overlap with the customer-managed VPC or with the Confluent Cloud network (for Transit Gateway and VPC peered clusters). Note that this latter restriction does not apply to Confluent Cloud PrivateLink clusters.

    Note here, that the CIDR range we picked does not overlap with the customer-managed VPC.

  6. Check Publish to CloudWatch logs. This step is important for debugging purposes.

  7. Click Next and then Create pipeline.

Pipeline deployment takes around 5-10 minutes, and data will start flowing into the OpenSearch domain.

Verifying the output

To verify that the pipeline is working properly and that data from the topics is being sinked into the corresponding OpenSearch indexes, follow these steps:

  1. In the OpenSearch Dashboards navigation pane, click on "Dev Tools" under the "Management" section.

  2. In the Dev Tools console, run the following command to retrieve data from the index created by the pipeline:

    ‘’’
    GET /<AMAZON_OPENSEARCH_INDEX_NAME>/_search
    {
      "query": {
        "match_all": {}
      },
      "size": 300
    }
    ‘’’

    This command will perform a search across all documents in the index created by the pipeline and will return up to 300 results.

You can further explore the Dev Tools console to perform more advanced queries, filter the data, or analyze the indexed documents as per your requirements.

Optional OpenSearch Ingestion features

  • Schema Registry support: OpenSearch Ingestion supports topics backed by Schema Registry. To configure this add the following to the pipeline configuration:

    • In the ‘kafka’ configuration add the Schema Registry URL and credentials:

      ‘’’
      Schema:
            type: confluent
            registry_url: <CONFLUENT_SCHEMA_REGISTRY_URL>
            api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}"
            api_secret: "${{aws_secrets:schema-secret:schema_registry_api_Secret}}"
            basic_auth_credentials_source: "USER_INFO"
      
      ‘’’

    • Add the AWS Secret Manager details for Confluent Schema Registry credentials. Under ‘extensions’ → ‘aws’ add:

       secrets:
             schema-secret:
               secret_id: "<CONFLUENT_SCHEMA_SECRET_MANAGER_NAME>"
               region: "<AWS_REGION>"
               sts_role_arn: "<OPENSEARCH_INGESTION_ROLE>"

Clean up

To avoid potential future charges, clean up any unused resources from your AWS account and Confluent account. 

Conclusion

In this post, we demonstrated how to use the new Confluent Cloud blueprint in OpenSearch Ingestion, which enables you to easily utilize private networking in your OpenSearch/Confluent Cloud integration. You learned how to set up a theoretical OpenSearch Ingestion pipeline, including the optional usage of Schema Registry. In summary, the new feature represents a step forward in allowing Confluent Cloud, which features more than 80 fully managed connectors to other systems, to serve as the nexus of ingestion for your OpenSearch implementation.

Next steps

Not yet a Confluent customer? Try Confluent Cloud in AWS Marketplace. New sign-ups receive $1,000* in free credits to spend during their first 30 days. Your credits will be immediately visible on your Confluent account after subscribing to Confluent through the AWS Marketplace.

*Confluent is offering a limited-time promotion wherein, you will receive an additional $600 bonus credits on top of the normal offer of $400 free credits. Finish signing up for Confluent Cloud and you will receive a $600 promo code to your email address used to create your account with us. During this free trial period, you'll have full access to all features, enabling you to build multiple use cases, connect to your databases, and get technical support whenever needed.

  • Ahmed Zamzam is a staff partner solutions architect at Confluent, specializing in integrations with cloud service providers (CSPs). Leveraging his expertise, he collaborates closely with CSP service teams to develop and refine integrations, empowering customers to leverage their data in real time effectively. Outside of work, Zamzam enjoys traveling, playing tennis, and cycling.

  • Weifan Liang is a Senior Partner Solutions Architect at AWS. He works closely with AWS top strategic data analytics software partners to drive product integration, build optimized architecture, develop long-term strategy, and provide thought leadership. Innovating together with partners, Weifan strives to help customers accelerate business outcomes with cloud powered digital transformation.

Did you like this blog post? Share it now