We’ll beat any hyperscaler on cost. Learn more about our price guarantee. Learn more

How to Protect PII in Apache Kafka® With Schema Registry and Data Contracts

Written By

A data contract is a formal agreement between an upstream component and a downstream component on the structure and semantics of data that’s in motion. In a previous post, I showed how Confluent Schema Registry supports data contracts. By combining data contracts and encryption on streaming workloads, you can shift left the responsibility of data consistency, quality, and security to the producer, allowing the consumer to depend on a trustworthy stream of data.

In regulated industries, this kind of shift-left governance approach is essential for consistently protecting sensitive data like personally identifiable information (PII) in healthcare, financial services, and education. In this post, I'll show how data contracts can be used to protect PII like patient data in the following scenarios:

  • Ensuring the quality of fields that contain personal information, such as email addresses

  • Using a dead letter queue (DLQ) to capture invalid or missing data

  • Using Common Expression Language (CEL) functions to perform simple masking of personal information

  • Using client-side field level encryption (CSFLE) to encrypt personal information

Ready to try these scenarios for yourself?

Defining a Patient Schema With Schema Registry

Let's create a simple Avro schema to represent a patient:

{
  "name": "Patient",
  "namespace": "acme.com",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    { 
      "name": "firstName",
      "type": "string"
    },
    { 
      "name": "lastName",
      "type": "string"
    },
    { 
      "name": "ssn",
      "type": "string"
    },
    { 
      "name": "email",
      "type": [ "null", "string" ],
      "default": null,
      "confluent:tags": [ "PII" ]
    }
  ]
}  

Note that we've tagged the email field in the Avro schema as containing PII. This tag will be used with CSFLE later in this post. An inline tag is specified with the property name confluent:tags. Inline tags can also be specified for Protobuf and JSON schemas.

When using Confluent Cloud, the tag definition must first be created in Stream Catalog before the schema is registered. Currently, we don't have to tag the ssn field as we'll be handling that field with a masking rule instead of encryption. The masking rule will be based on the field name rather than the field tags.

Assuming the above schema is in a file named patient.avsc, we’ll upload it to a local Schema Registry as follows:

jq -n --rawfile schema patient.avsc '{schema: $schema}' | 
  curl http://localhost:8081/subjects/patients-value/versions --json @-

Let's try producing and consuming with our new schema using the Avro console producer and consumer. First, we’ll start the consumer.

./bin/kafka-avro-console-consumer \
  --topic patients \
  --bootstrap-server localhost:9092

In a separate terminal, we’ll start the producer and pass the schema ID that was returned during registration as the value of value.schema.id.

./bin/kafka-avro-console-producer \
  --topic patients \
  --broker-list localhost:9092 \
  --property value.schema.id=<id>

{"id": 1, "firstName": "Bob", "lastName": "Smith", "ssn": "123-45-6789", "email": {"string": "bob@acme.com" } }

Note that since the type of the email field is a union, we need to wrap the value with a JSON object indicating the type as string. When the above record in JSON format is sent via the producer, it will be received by the consumer.

Adding Data Quality Rules to the Data Contract

To ensure that the data produced to an Apache Kakfa® topic is valid, we can add data quality rules to validate the ssn and email fields.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkSsn",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.ssn.matches(r'\\d{3}-\\d{2}-\\d{4}')" 
      },
      {
        "name": "checkEmail",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.email.isEmail()" 
      }
    ]
  }
}

We use CEL expressions to validate the ssn and email fields. In the case of the ssn, we use a regular expression expressed as a raw string (with the r prefix) so that we don't have to add additional escape sequences. In the case of the email, we use a built-in string function to validate the field value. The built-in string functions available in CEL rules are as follows:

  • isEmail()

  • isHostname()

  • isIpv4()

  • isIpv6()

  • IsUri()

  • isUriRef()

  • isUuid()

Assuming the rules are specified in a file named patient_ruleset.json, we can register the rule set directly to Schema Registry, which will create a new schema version with the schema of the previous version and the new rule set.

curl http://localhost:8081/subjects/patients-value/versions \
  --json @patient_ruleset.json

Let's try out our data quality rules. First, we’ll start the consumer.

./bin/kafka-avro-console-consumer \
  --topic patients \
  --bootstrap-server localhost:9092

In a separate terminal, we’ll start the producer. This time, we'll enter a record with an invalid email.

./bin/kafka-avro-console-producer \
  --topic patients \
  --broker-list localhost:9092 \
  --property value.schema.id=<id>

{"id": 1, "firstName": "Bob", "lastName": "Smith", "ssn": "123-45-6789", "email": { "string": "bob" } }

We should see an error of the form:

Expr failed: 'message.email.isEmail()'

To learn more about CEL, see Understanding CEL in Data Contract Rules.

Using a DLQ to Validate Topics With PII

Instead of simply rejecting messages that fail the data quality rules, we may want to capture them in a DLQ for further analysis and processing. To do so, we’ll set the "onFailure" action to "DLQ."

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkSsn",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.ssn.matches(r'\\d{3}-\\d{2}-\\d{4}')",
        "params": {
          "dlq.topic": "bad_patients_ssn"
        },
        "onFailure": "DLQ"
      },
      {
        "name": "checkEmail",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.email.isEmail()",
        "params": {
          "dlq.topic": "bad_patients_email"
        },
        "onFailure": "DLQ"
      }
    ]
  }
}

Once again, we'll create a new schema version with the above rule set.

curl http://localhost:8081/subjects/patients-value/versions \
  --json @patient_ruleset2.json

Once we’ve created the DLQ topics bad_patients_email and bad_patients_ssn, we’re ready to try out our new data quality rules. First, we’ll start the consumer.

./bin/kafka-avro-console-consumer \
  --topic patients \
  --bootstrap-server localhost:9092

In a separate terminal, we’ll start the producer. The last two properties passed to the console producer are used by the DLQ action. 

./bin/kafka-avro-console-producer \
  --topic patients \
  --broker-list localhost:9092 \
  --property value.schema.id=<id> \
  --property bootstrap.servers=localhost:9092 #dlq \
  --property dlq.auto.flush=true #dlq

{"id": 1, "firstName": "Bob", "lastName": "Smith", "ssn": "123-45-6789", "email": { "string": "bob" } }
{"id": 1, "firstName": "Bob", "lastName": "Smith", "ssn": "123456789", "email": { "string": "bob@acme.com" } }

If we check the DLQ topics, we’ll see the errant records. Currently, the DLQ action is available only with the Java client.

Using CEL Functions for Simple Masking of PII

Often, we may want to store only the last four digits of the ssn for healthcare use cases to minimize the risk or impact of a data breach. In this case, we can use a CEL function to perform a field-level transformation.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkSsn",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.ssn.matches(r'\\d{3}-\\d{2}-\\d{4}')",
        "params": {
          "dlq.topic": "bad_patients_ssn"
        },
        "onFailure": "DLQ"
      },
      {
        "name": "checkEmail",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.email.isEmail()",
        "params": {
          "dlq.topic": "bad_patients_email"
        },
        "onFailure": "DLQ"
      },
      {
        "name": "maskSsn",
        "kind": "TRANSFORM",
        "type": "CEL_FIELD",
        "mode": "WRITE",
        "expr": "name == 'ssn' ; 'XXX-XX-' + value.substring(7,11)"
      }
    ]
  }
}

The maskSsn rule is a transformation using a field-level CEL expression, which has a guard to ensure that it applies only to the ssn field. The transformation in the expression concatenates a masking prefix of "XXX-XX-" with the last four digits of the ssn field using a substring function. CEL rules support the following string functions:

  • charAt(int index)

  • indexOf(string str)

  • join(string delimiter)

  • lastIndexOf(string s)

  • lowerAscii()

  • replace(string target, string replacement)

  • split(string delimiter)

  • substring(int beginIndex, int endIndex)

  • trim()

  • upperAscii()

Using CSFLE

CSFLE allows you to safeguard sensitive data within a message and is available on both Confluent Platform (starting with version 8.0) and Confluent Cloud. CSFLE uses a technique called envelope encryption, in which a key encryption key (KEK) is used to encrypt data encryption keys (DEKs), which are the actual keys used to encrypt fields. The KEK is typically a key from an external key management system (KMS), such as AWS KMS, Azure Key Vault, Google Cloud KMS, or HashiCorp Vault.

Schema Registry exposes a subcomponent called the DEK Registry, which provides APIs for managing KEKs and DEKs. The key from the KMS is registered as a KEK to the DEK Registry, and the DEK Registry will also hold the encrypted DEKs used for CSFLE. DEKs are scoped by subject.

In Confluent Cloud, for a given KEK, the DEK Registry can be optionally given permission to make direct calls to the KMS in order to encrypt and decrypt DEKs. The main advantage of granting this permission is to enable other Confluent Cloud offerings, such as Apache Flink® and Kafka Connect, to process data that was previously encrypted by CSFLE.

If permission is granted, the DEK Registry will generate the DEK, encrypt it, and make the decrypted DEK available to clients that have the proper authorization. If permission is not granted, the client will typically generate the DEK, encrypt it, and store it to the DEK Registry.

For the purposes of this post, we'll be using local keys as KEKs instead of a KMS, which is a way to start testing before setting up the desired KMS and KEK. Local keys should not be used in production.

A producer and consumer can use encryption rules to protect sensitive data end to end.

Below we add a rule named encryptEmail to perform client-side encryption on the email field.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkSsn",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.ssn.matches(r'\\d{3}-\\d{2}-\\d{4}')",
        "params": {
          "dlq.topic": "bad_patients_ssn"
        },
        "onFailure": "DLQ"
      },
      {
        "name": "checkEmail",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.email.isEmail()",
        "params": {
          "dlq.topic": "bad_patients_email"
        },
        "onFailure": "DLQ"
      },
      {
        "name": "maskSsn",
        "kind": "TRANSFORM",
        "type": "CEL_FIELD",
        "mode": "WRITE",
        "expr": "name == 'ssn' ; 'XXX-XX-' + value.substring(7,11)"
      },
      {
        "name": "encryptEmail",
        "kind": "TRANSFORM",
        "type": "ENCRYPT",
        "mode": "WRITEREAD",
        "tags": [ "PII" ],
        "params": {
           "encrypt.kek.name": "local-kek1",
           "encrypt.kms.key.id": "mykey",
           "encrypt.kms.type": "local-kms"
        }, 
        "onFailure": "ERROR,NONE"
      }
    ]
  }
}

In the above, while the KEK name specified is mandatory, the KMS key ID and KMS type specified are optional if the KEK has been pre-registered with the DEK Registry. Otherwise, if the KEK has not been pre-registered, the client can automatically register the KEK with the given name, KMS key ID, and KMS type before registering any DEKs.

To try out CSFLE, first we need to specify a local secret that will be used to generate a local KEK to encrypt and decrypt the DEKs.

One way to obtain a local secret is to use the following command:

openssl rand -base64 16

The output of this command should be used to set the environment variable named LOCAL_SECRET for both the producer and consumer. Note: If we change the LOCAL_SECRET, we should publish a new encryption rule with a different KEK name.

export LOCAL_SECRET=<output of "openssl rand -base64 16">

./bin/kafka-avro-console-consumer \
  --topic patients \
  --bootstrap-server localhost:9092

In a separate terminal, we’ll start the producer and pass the schema ID that was returned during registration as the value of value.schema.id.

export LOCAL_SECRET=<output of "openssl rand -base64 16">

./bin/kafka-avro-console-producer \
  --topic patients \
  --broker-list localhost:9092 \
  --property value.schema.id=<id>

{"id": 1, "firstName": "Bob", "lastName": "Smith", "ssn": "123-45-6789", "email": { "string": "bob@acme.com" } }

The consumer will be able to see the email in its original form since it’s using the same KEK as the producer. To verify that encryption is taking place, we can unset the LOCAL_SECRET environment variable and rerun the consumer.

unset LOCAL_SECRET

./bin/kafka-avro-console-consumer \
  --topic patients \
  --bootstrap-server localhost:9092

Without the local KEK, the record received by the consumer should look something like the following:

{"id":1,"firstName":"Bob","lastName":"Smith","ssn":"XXX-XX-6789","email":{"string":"9fij4jo8XtON8qEtZ6WMwzQ9Z6Os6mMuocIaYggZA6ipYyuKtUuX4g=="}}

Learn More About Protecting Sensitive Data With Confluent

With the addition of simple masking functions and CSFLE, data contracts can be used to handle sensitive data in a manner necessary to meet the proper compliance rules and regulations. The use of data contracts helps to shift left the responsibility of ensuring data quality, interoperability, and compliance to the producer, which is the source of the data, allowing the consumer to depend on a secure and reliable stream of data.

To learn more, see the documentation on Data Contracts for Schema Registry. If you’d like to get started with Confluent Cloud or Confluent Platform:


Apache®, Apache Kafka®, Kafka®, Apache Flink®, and Flink® are registered trademarks of the Apache Software Foundation. No endorsement by the Apache Software Foundation is implied by the use of these marks.

  • Robert Yokota is a software engineer at Confluent, currently working in the area of data governance. He previously worked at Microsoft.

Did you like this blog post? Share it now