Don’t miss out on Current in New Orleans, October 29-30th — save 30% with code PRM-WEB | Register today

Leveraging Confluent Cloud Schema Registry with AWS Lambda Event Source Mapping

Écrit par

Introduction

In our previous blog post, we introduced two ways that Confluent Cloud can integrate with AWS Lambda. One option is using Lambda’s Event Source Mapping (ESM) for Apache Kafka®, wherein Lambda creates a consumer group, consumes records off the provided topic, and triggers the Lambda function. The record is polled by the ESM, and the consumed record subsequently acts as the event data provided to (and processed by) the Lambda function. Earlier this year, we published a blog post showing how the Provisioned Mode for Kafka ESM can be used to build high-throughput event streaming applications with Confluent and Lambda. Since the release of this capability, customers have used this integration for workloads across call center analytics, airline ticketing and personalized notification systems, and image processing analytics. Due to customer interest in leveraging the simplicity of this integration for their event-driven Kafka applications, the Lambda team has released another enhancement to ESM to include support for Avro, Protobuf, and Confluent Schema Registry.

What’s the Point of Schema Registry Anyway?

Schema Registry provides a centralized repository for managing and validating schemas for topic message data as well as for serialization and deserialization of the data over the network. Producers and consumers to Kafka topics can use schemas to ensure data consistency and compatibility as schemas evolve. Thus, Schema Registry is a key component for data governance, helping to ensure data quality, adherence to standards, visibility into data lineage, audit capabilities, collaboration across teams, efficient application development protocols, and system performance.

A schema, on the other hand, defines the structure of message data. It acts as a blueprint for data, describing the structure of data records, the data types of individual fields, the relationships between fields, and any constraints or rules that apply to the data. In practice, it facilitates data sharing and interoperability between different systems and organizations by ensuring that the data is consistent and accurate and that it can be efficiently processed and analyzed no matter where it’s sent. You can learn more about the benefits of using Schema Registry from the Confluent documentation.

Why Schema Registry Support With ESM?

To enable customers who want to take advantage of the above-mentioned benefits, Lambda released support for Schema Registry within ESM, further simplifying the integration between Confluent Cloud and Lambda and presenting an opportunity for cost savings.

Less Custom Code

Previously, customers using Confluent’s Schema Registry with Lambda’s ESM were required to write custom code to deserialize the data received from Confluent. Below is a code snippet example of what customers would’ve written to do this.

#Instantiate the Schema Registry Client.
schema_registry_client = SchemaRegistryClient(sr_config)

#Instatiate the Avro Deserializer. Note how the schema (schema_str) also had to be manually uploaded/provided to the AWS Lambda function
avro_deserializer = AvroDeserializer(schema_registry_client,schema_str)

#Manually indicate the topic from which the record is being consumed
topic="testing"

#Unpack the AWS Lambda event
partition=list(event['records'].keys())[0]
message=event['records'][partition]

#Records are base64 encoded and need to be decoded before deserialized
decoded_value=base64.b64decode(message[0]['value'])

#Deserialize the value
value=avro_deserializer(decoded_value, SerializationContext(topic, MessageField.VALUE))

Note that the above code is just to deserialize the value of a Kafka record. If customers wanted to use or reference the Kafka record key, the same would need to be repeated.

Now, with Lambda’s ESM support for Schema Registry, the above is all handled by Lambda. A user can simply provide the HTTPS URL of their Confluent Cloud Schema Registry, and ESM will handle all the deserialization and pass the output to Lambda.

Use ESM Filtering

Another added benefit is the ability to leverage ESM’s built-in filtering while using Confluent Cloud’s Schema Registry. Filtering helps customers reduce costs by allowing them to define when Lambda’s functions are triggered. To illustrate this, consider the incoming record below:

{
    "time": "2021-11-09 13:32:04",
    "fleet_id": "fleet-452",
    "vehicle_id": "a42bb15c-43eb-11ec-81d3-0242ac130003",
    "lat": 47.616226213162406,
    "lon": -122.33989110734133,
    "speed": 43,
    "odometer": 43519,
    "tire_pressure": [41, 40, 31, 41],
    "weather_temp": 76,
    "weather_pressure": 1013,
    "weather_humidity": 66,
    "weather_wind_speed": 8,
    "weather_wind_dir": "ne"
}

A customer could set a rule so that the Lambda function would trigger only if speed were greater than 35 or if weather pressure was below 1,000. These rules can drastically reduce the number of times that Lambda runs unnecessarily, which subsequently reduces costs.

Data Integrity

Although JSON is widely used as the format for sending messages over Kafka topics, Avro and Protobuf offer significant advantages over JSON, primarily in terms of performance, schema enforcement, and data size. Both Avro and Protobuf require a predefined schema that defines the structure of the data. In this way, data integrity and consistency are ensured, as data that does not conform to the schema will be rejected. JSON, on the other hand, is schema-less, which can lead to data quality issues.

Consider the following schema:

{
    "time": "2021-11-09 13:32:04",
    "fleet_id": "fleet-452",
    "vehicle_id": "a42bb15c-43eb-11ec-81d3-0242ac130003",
    "lat": 47.616226213162406,
    "lon": -122.33989110734133,
    "speed": 43,
    "odometer": 43519,
    "tire_pressure": [41, 40, 31, 41],
    "weather_temp": 76,
    "weather_pressure": 1013,
    "weather_humidity": 66,
    "weather_wind_speed": 8,
    "weather_wind_dir": "ne"
}

Get Started Examples

Let’s walk through an example of using Schema Registry for a stream processing use case. For this blog post, we’ll use the Confluent Data Generator source connector to generate records in the Avro format. This will provide schema-bound data with which we can trigger Lambda.

Create the Kafka Topic and API Keys

  1. Create Kafka API keys. These will allow the connector to authenticate to the Confluent Kafka cluster and produce messages into a topic. Navigate your Kafka cluster within Confluent Cloud and click “API Keys” on the left navigation bar.

  2. Create a Kafka topic. Next, you’ll create a topic where data from the data generation connector can land. Let’s call it stock_trades. Create it with the defaults and skip the data contract. When the data generation connector starts publishing data into the topic, it will also automatically set up the Avro schema.

Deploy the Connector

  1. Navigate to the Connectors page. From the Kafka cluster view, find the sample data generator connector from the Confluent console by clicking “Connectors” on the navigation panel.

  2. Search for the connector. In the search box, type “Sample Data.” Click the “Sample Data” tile in the results section, as shown below.

  3. Begin configuring the data generation. A dialog box will appear that will allow you to select the specific type of data that you want to generate. Click on “Additional configuration.” It will open a page to select the topic. Select the stock_trades topic you previously created. Click on “Continue.”

  4. Insert your Kafka API keys. You’ll see a page with an option to select the API key. Enter the API key and secret that you created for this demo.

  5. Set output record type and schema template. On the next page, you’ll see the configuration to select the record format and the schema for the messages. Select “Avro” and the “Stock Trades” schema.

  6. Finalize the data generator. Click on “Continue” and provide a name, such as “StockTradeGenerator,” for the connector on the last page.

    ‎ 

  7. Confirm incoming data and the schema. The connector should go into a running state in a few minutes. You can check for messages on the stock_trade topic to validate. You now have a topic with Avro messages with a schema attached. You can see the attached schema by clicking on the “Data Contracts” tab for the topic.

Create Schema Registry Keys

In order to process the messages on the topic, Lambda will need the public endpoint of your Confluent Schema Registry and API keys to access it.

  1. Navigate to Schema Registry. Navigate to your Confluent Kafka cluster. On the left navigation panel, click “Schema Registry.”

    ‎ 

  2. Note the public endpoint. Click on the “Overview” tab, which will provide the public and private endpoints for Schema Registry. You’ll use the public endpoint for the demo. Keep this handy for later use.

  3. Create Schema Registry API keys. From that same “Overview” tab in the previous step, click “API keys” and create a new set of keys. Note that this is distinctly different from Kafka API keys. Kafka API keys authenticate with a Kafka cluster Schema Registry, while API keys authenticate with a schema registry.

Deploying the Lambda Function

With the topic established and Avro messages actively flowing into Kafka, the next step is to deploy the Lambda function that will be triggered by each incoming record.

  1. Save API keys in AWS Secrets Manager. Before deployment, ensure that the Schema Registry API key/secret and the cluster API key/secret are securely stored in Secrets Manager. These will then be leveraged by Lambda to authenticate with the Confluent Kafka cluster and Schema Registry. Additionally, Lambda requires Provisioned Mode to be enabled in order to use Schema Registry and Avro support. For detailed instructions on setting up Secrets Manager, refer to the documentation. You can also refer to the AWS launch blog post for Schema Registry support to have a more detailed walk-through of all the steps.

    ‎ 

  2. Create or select the Lambda function. The simplest route is to create a new Lambda function. Alternatively, you may test out the trigger using an existing Lambda function. If you’re using Python, we recommend including the code block below that parses and prints the deserialized record to Amazon CloudWatch.

    for record in event['records']:
           print("Current Record: " + str(event['records'][record]))
           # Now looping through the kafka messages within a particular key
           for messages in event['records'][record]:
               print("********************")
               print("Now printing details of record number: " + str(i))
               print("Topic: " + str(messages['topic']))
               print("Partition: " + str(messages['partition']))
               print("Offset: " + str(messages['offset']))
               print("Topic: " + str(messages['topic']))
               print("Timestamp: " + str(messages['timestamp']))
               print("TimestampType: " + str(messages['timestampType']))
               # each kafka message has a key and a value that are base64 encoded
               if None is not messages.get('key'):
                   b64decodedKey=base64.b64decode(messages['key'])
                   decoded_string = b64decodedKey.decode('utf-8', errors='ignore')
                   print("Base64 Decoded Key: " + decoded_string)
                   #decodedKey=b64decodedKey.decode('ascii')
               else:
                   decodedKey="null"
               if None is not messages.get('value'):
                   b64decodedValue=base64.b64decode(messages['value'])
                   decoded_string = b64decodedValue.decode('utf-8', errors='ignore')
                   print("Base64 Decoded Value: " + decoded_string)
                   # decodedValue=b64decodedValue.decode('ascii')

  3. Set up the Lambda trigger. In order to process the messages from the Kafka topic, you’ll use the self-managed Kafka event trigger to invoke the Lambda function. The specific settings for Schema Registry and Avro payload are as follows:

    1. Schema Registry URI: This refers to the public endpoint for Schema Registry.

    2. Authentication secret URI: This refers to the secret stored in Secrets Manager referencing the Schema Registry API key/secret.

    3. Event record format: Select “JSON” for the event record format. This will automatically convert the Avro records into JSON format and deliver it to the Lambda function.

    4. Schema validation attribute: Change it to “Value,” as you want to validate both the key and value.

  4. Confirming record deserialization: Once the trigger is enabled, it should start invoking the Lambda function, and you should start seeing the messages logged into CloudWatch logs.

Considerations

With all the excitement of these new benefits, there are a few points to keep in mind:

  1. Performance and cost considerations: Schema validation and deserialization can add processing time before your function invocation. However, this overhead is typically minimal when compared to the benefits. ESM caching minimizes Schema Registry API calls. Filtering allows you to reduce costs, depending on how effectively your filtering rules eliminate irrelevant events. This feature simplifies the operational overhead of managing Schema Registry integration code so that teams can focus on business logic rather than infrastructure concerns.

    ‎ 

  2. Error handling: If schema registries become temporarily unavailable, then cached schemas allow event processing to continue until the registry is available again. Authentication failures generate error messages with automatic retry logic. Schema evolution happens seamlessly as Lambda automatically detects and fetches new versions.

    ‎ 

    If events fail validation or deserialization, they’re routed to your configured failure destinations. For Amazon SQS and Amazon SNS destinations, the services send metadata about the failure. For Amazon S3 destinations, both metadata and the original serialized payload are included for detailed analysis.

    ‎ 

    You can use standard Lambda monitoring with more CloudWatch metrics providing visibility into schema validation success rates, registry API usage, and filtering effectiveness.

    ‎ 

  3. Client Side Field Level Encryption (CSFLE) not supported: This launch from AWS Lambda does not support CSFLE, which allows you to safeguard sensitive data, such as personally identifiable information (PII), by enabling field level encryption at both the producer and consumer levels.

Conclusion

Confluent and AWS Lambda continue to improve the experience for developers looking to leverage serverless compute and data streaming with Kafka to build their event-driven architectures. AWS Lambda’s support for Avro and Protobuf formats and Confluent Schema Registry for Kafka event processing (in Provisioned Mode) enables schema validation, event filtering, and integration with Confluent and self-managed Kafka clusters.

For more information about Lambda’s Kafka integration capabilities, go to the learning guide or Lambda ESM documentation. To learn about Lambda pricing, such as Provisioned Mode costs, visit the Lambda pricing page.

Get Started

With this setup, integrating Confluent Cloud Schema Registry with AWS Lambda ESM simplifies event-driven architectures by removing custom deserialization code, enabling schema validation, and providing cost-saving filtering capabilities. Developers can now focus on business logic while ensuring data consistency and integrity across Avro and Protobuf formats.

Ready to try Confluent Cloud on AWS Marketplace? New sign-ups receive $1,000 in free credits for their first 30 days! Subscribe through AWS Marketplace, and your credits will be instantly applied to your Confluent account.


Apache®, Apache Kafka®, Kafka®, Apache Flink®, and Flink® are registered trademarks of the Apache Software Foundation. No endorsement by the Apache Software Foundation is implied by the use of these marks.

  • Tarun Rai Madan is a Principal Product Manager at Amazon Web Services (AWS). He specializes in serverless technologies and leads product strategy to help customers achieve accelerated business outcomes with event-driven applications, using services like AWS Lambda, AWS Step Functions, Apache Kafka, and Amazon SQS/SNS. Prior to AWS, he was an engineering leader in the semiconductor industry, and led development of high-performance processors for wireless, automotive, and data center applications.

  • Mithun is a Senior Product Manager at Confluent. He focuses on the integration and optimization of native cloud data services into the Confluent ecosystem. He has a deep background in serverless stream processing technologies such as AWS Lambda and building event driven architectures using Apache Kafka. He has over twenty years of experience in the industry with a focus on application integration and data streaming technologies.

Avez-vous aimé cet article de blog ? Partagez-le !