We’ll beat any hyperscaler on cost. Learn more about our price guarantee. Learn more
A data contract is a formal agreement between an upstream component and a downstream component on the structure and semantics of data that’s in motion. In a previous post, I showed how Confluent Schema Registry supports data contracts. By combining data contracts and encryption on streaming workloads, you can shift left the responsibility of data consistency, quality, and security to the producer, allowing the consumer to depend on a trustworthy stream of data.
In regulated industries, this kind of shift-left governance approach is essential for consistently protecting sensitive data like personally identifiable information (PII) in healthcare, financial services, and education. In this post, I'll show how data contracts can be used to protect PII like patient data in the following scenarios:
Ensuring the quality of fields that contain personal information, such as email addresses
Using a dead letter queue (DLQ) to capture invalid or missing data
Using Common Expression Language (CEL) functions to perform simple masking of personal information
Using client-side field level encryption (CSFLE) to encrypt personal information
Ready to try these scenarios for yourself?
Let's create a simple Avro schema to represent a patient:
Note that we've tagged the email
field in the Avro schema as containing PII. This tag will be used with CSFLE later in this post. An inline tag is specified with the property name confluent:tags
. Inline tags can also be specified for Protobuf and JSON schemas.
When using Confluent Cloud, the tag definition must first be created in Stream Catalog before the schema is registered. Currently, we don't have to tag the ssn
field as we'll be handling that field with a masking rule instead of encryption. The masking rule will be based on the field name rather than the field tags.
Assuming the above schema is in a file named patient.avsc
, we’ll upload it to a local Schema Registry as follows:
Let's try producing and consuming with our new schema using the Avro console producer and consumer. First, we’ll start the consumer.
In a separate terminal, we’ll start the producer and pass the schema ID that was returned during registration as the value of value.schema.id
.
Note that since the type of the email
field is a union, we need to wrap the value with a JSON object indicating the type as string
. When the above record in JSON format is sent via the producer, it will be received by the consumer.
To ensure that the data produced to an Apache Kakfa® topic is valid, we can add data quality rules to validate the ssn
and email
fields.
We use CEL expressions to validate the ssn
and email
fields. In the case of the ssn, we use a regular expression expressed as a raw string (with the r prefix) so that we don't have to add additional escape sequences. In the case of the email, we use a built-in string function to validate the field value. The built-in string functions available in CEL rules are as follows:
isEmail()
isHostname()
isIpv4()
isIpv6()
IsUri()
isUriRef()
isUuid()
Assuming the rules are specified in a file named patient_ruleset.json
, we can register the rule set directly to Schema Registry, which will create a new schema version with the schema of the previous version and the new rule set.
Let's try out our data quality rules. First, we’ll start the consumer.
In a separate terminal, we’ll start the producer. This time, we'll enter a record with an invalid email.
We should see an error of the form:
To learn more about CEL, see Understanding CEL in Data Contract Rules.
Instead of simply rejecting messages that fail the data quality rules, we may want to capture them in a DLQ for further analysis and processing. To do so, we’ll set the "onFailure" action to "DLQ."
Once again, we'll create a new schema version with the above rule set.
Once we’ve created the DLQ topics bad_patients_email
and bad_patients_ssn
, we’re ready to try out our new data quality rules. First, we’ll start the consumer.
In a separate terminal, we’ll start the producer. The last two properties passed to the console producer are used by the DLQ action.
If we check the DLQ topics, we’ll see the errant records. Currently, the DLQ action is available only with the Java client.
Often, we may want to store only the last four digits of the ssn for healthcare use cases to minimize the risk or impact of a data breach. In this case, we can use a CEL function to perform a field-level transformation.
The maskSsn
rule is a transformation using a field-level CEL expression, which has a guard to ensure that it applies only to the ssn
field. The transformation in the expression concatenates a masking prefix of "XXX-XX-" with the last four digits of the ssn
field using a substring
function. CEL rules support the following string functions:
charAt(int index)
indexOf(string str)
join(string delimiter)
lastIndexOf(string s)
lowerAscii()
replace(string target, string replacement)
split(string delimiter)
substring(int beginIndex, int endIndex)
trim()
upperAscii()
CSFLE allows you to safeguard sensitive data within a message and is available on both Confluent Platform (starting with version 8.0) and Confluent Cloud. CSFLE uses a technique called envelope encryption, in which a key encryption key (KEK) is used to encrypt data encryption keys (DEKs), which are the actual keys used to encrypt fields. The KEK is typically a key from an external key management system (KMS), such as AWS KMS, Azure Key Vault, Google Cloud KMS, or HashiCorp Vault.
Schema Registry exposes a subcomponent called the DEK Registry, which provides APIs for managing KEKs and DEKs. The key from the KMS is registered as a KEK to the DEK Registry, and the DEK Registry will also hold the encrypted DEKs used for CSFLE. DEKs are scoped by subject.
In Confluent Cloud, for a given KEK, the DEK Registry can be optionally given permission to make direct calls to the KMS in order to encrypt and decrypt DEKs. The main advantage of granting this permission is to enable other Confluent Cloud offerings, such as Apache Flink® and Kafka Connect, to process data that was previously encrypted by CSFLE.
If permission is granted, the DEK Registry will generate the DEK, encrypt it, and make the decrypted DEK available to clients that have the proper authorization. If permission is not granted, the client will typically generate the DEK, encrypt it, and store it to the DEK Registry.
For the purposes of this post, we'll be using local keys as KEKs instead of a KMS, which is a way to start testing before setting up the desired KMS and KEK. Local keys should not be used in production.
A producer and consumer can use encryption rules to protect sensitive data end to end.
Below we add a rule named encryptEmail
to perform client-side encryption on the email field.
In the above, while the KEK name specified is mandatory, the KMS key ID and KMS type specified are optional if the KEK has been pre-registered with the DEK Registry. Otherwise, if the KEK has not been pre-registered, the client can automatically register the KEK with the given name, KMS key ID, and KMS type before registering any DEKs.
To try out CSFLE, first we need to specify a local secret that will be used to generate a local KEK to encrypt and decrypt the DEKs.
One way to obtain a local secret is to use the following command:
The output of this command should be used to set the environment variable named LOCAL_SECRET for both the producer and consumer. Note: If we change the LOCAL_SECRET, we should publish a new encryption rule with a different KEK name.
In a separate terminal, we’ll start the producer and pass the schema ID that was returned during registration as the value of value.schema.id
.
The consumer will be able to see the email in its original form since it’s using the same KEK as the producer. To verify that encryption is taking place, we can unset the LOCAL_SECRET environment variable and rerun the consumer.
Without the local KEK, the record received by the consumer should look something like the following:
With the addition of simple masking functions and CSFLE, data contracts can be used to handle sensitive data in a manner necessary to meet the proper compliance rules and regulations. The use of data contracts helps to shift left the responsibility of ensuring data quality, interoperability, and compliance to the producer, which is the source of the data, allowing the consumer to depend on a secure and reliable stream of data.
To learn more, see the documentation on Data Contracts for Schema Registry. If you’d like to get started with Confluent Cloud or Confluent Platform:
Sign up for Confluent Cloud and get a 30-day trial with $400 of free credit.
Download Confluent Platform and get a 30-day free trial on unlimited brokers or always on a single broker.
Apache®, Apache Kafka®, Kafka®, Apache Flink®, and Flink® are registered trademarks of the Apache Software Foundation. No endorsement by the Apache Software Foundation is implied by the use of these marks.
Confluent Cloud is expanding on Jio Cloud in India. New features include Public and Private Link networking, the new Jio India Central region for multi-region resilience, and streamlined procurement via Azure Marketplace. These features empower Indian businesses with high-performance data streaming.
We will use Chess to explain some of the core ideas behind Confluent Cloud for Apache Flink. We’ve used the chessboard as an analogy to explain the Stream/Table duality before, but will expand on a few other concepts. Both systems involve sequences, state, timing, and pattern recognition and...