The Confluent Schema Registry plays a pivotal role in ensuring that producers and consumers in a streaming platform are able to communicate effectively. Ensuring the consistent use of schemas and their versions allows producers and consumers to easily interoperate, even when schemas evolve over time.
As streaming has become more ubiquitous, enterprises find that downstream components such as consumers are often tasked with handling data inconsistencies, incompatible changes, and expensive and complicated transformations in order to be able to process the data effectively. This has led to an effort to shift these responsibilities to the source of the data, such as the producer, an activity often referred to as shift-left. In the context of modern data architectures, such as streaming and data mesh, the effort to shift-left has led to an emphasis on the data contract. As a result, Confluent Schema Registry, both in Confluent Platform Enterprise and Confluent Cloud, now supports the use of data contracts.
A data contract is a formal agreement between an upstream component and a downstream component on the structure and semantics of data that is in motion. The upstream component enforces the data contract, while the downstream component can assume that the data it receives conforms to the data contract. Data contracts are important because they provide transparency over dependencies and data usage in a streaming architecture. They help to ensure the consistency, reliability, and quality of the data in event streams, and they provide a single source of truth for understanding the data in motion.
In this article, we'll walk through an example of enhancing a schema to be a full-fledged data contract. Our example will involve the following actions:
Defining an initial schema for the data contract
Enhancing the data contract with business metadata
Adding data quality rules to the data contract
Specifying custom actions for the data contract
Adding migration rules to the data contract for a complex schema evolution
Let's create a simple Avro schema to represent a customer order.
Assuming the above schema is in a file named
order.avsc, upload it to a local Schema Registry as follows:
Let's try producing and consuming with our new schema using the Avro console producer and consumer, which you can find the documentation for here. First, start the consumer.
In a separate terminal, start the producer, and pass the schema ID that was returned during registration as the value of
When the above record in JSON format is sent via the producer, it will be received by the consumer.
As mentioned, in the effort to shift-left the responsibilities for the quality and timeliness of streaming data, data contracts are typically defined by the producers. By examining the data contract, a downstream consumer can determine which person or team is responsible for the data contract and what service level objectives (SLOs) the data contract is attempting to achieve.
In a file named
order_metadata.json, we declare that this contract is owned by the Orders Team, and that one of the service level objectives is to ensure that orders are available to downstream consumers no later than 10 seconds after the order timestamp. We'll make use of these metadata properties later in this article.
Register the metadata directly to Schema Registry without specifying a schema. When omitting the schema while registering metadata, a new schema version will be created with the schema of the previous version.
As shown in the previous section, data contracts can capture business metadata. They can also be used to capture integrity constraints or data quality rules to ensure the quality of the data. In a file named
order_ruleset.json, let's add a rule that the price needs to be a positive number. The rule is specified as a Google Common Expression Language (CEL) expression.
Similarly to the metadata, register the rule set directly to Schema Registry without specifying a schema, as a new schema version will be created with the schema (as well as metadata) of the previous version.
The above rule will cause all messages with a non-positive price to be rejected.
Let's try out our data quality rule. First start the consumer.
In a separate terminal, start the producer.
We should see an error of the form:
To learn more about CEL, see Understanding CEL in Data Contract Rules.
The rule framework for data contracts is completely customizable. One can specify both custom rule executors and actions for a data contract.
Below is the Java interface for rule executors. The "transform" method is used by rules of both
TRANSFORM. In the former case, it should return a Boolean value; in the latter case, it should return the transformed value.
Below is the Java interface for rule actions. An exception is passed to the rule action if the rule executor failed, which includes a condition that returned false. After performing its work, the rule action should throw this exception if it exists.
Assume that on the consumer side we want to check the timeliness of data to see if it conforms to the declared service level objective. If it doesn't, we want to send an alert in the form of an email message. Below is a custom rule action to check the timeliness SLO. Note that the
type of the action is specified as
"EMAIL". Also, the expression
ctx.getParameter(name) will look for the value of a rule parameter with the given name. If not found, it will then look for the value of a metadata property with the given name.
Now that we've implemented a rule action to check the timeliness SLO, let's add it to the rule set. Note that the
"mode" of the rule is
"READ" since the timeliness check happens on the consumer. We use a CEL expression of "true", to ensure that the action is always invoked. Finally, the
"onSuccess" action is set to
The above code is available in a GitHub repository. After cloning the repository, register the schema with the following command:
To run the consumer:
To generate random Order data, run the producer:
You should now see emails being sent for any records that do not meet the timeliness SLO.
The consumers may be satisfied with the data contract so far, but perhaps one day the Orders Team decides that they want to change the schema in a backward-incompatible manner. For example, they decide to change the field named
"status". In Avro, one possibility is to use an alias, but some Avro implementations might not support aliases. So for the sake of this article, we'll consider changing the name of an Avro field to be a backward-incompatible change (as is the case for Protobuf and JSON Schema).
To support breaking changes within a schema version history, we need a way to partition the history into subsets of versions that are compatible with one another. We can achieve this with the notion of a compatibility group. We choose an arbitrary name for a metadata property, such as
"major_version", and then use that property to specify which data contracts belong to the same compatibility group.
First, we configure Schema Registry to only perform compatibility checks for schemas within a compatibility group.
Next, we change the field named
"status", as specified in a file named
Register the schema, and pass a new metadata property with the name
"major_version" and value
"2", so that the schema need not be backward compatible with previous versions.
If desired, we can now pin a client to the latest version of a specific compatibility group using the following client-side properties:
Above we've specified that the client should check for a new latest version after every 30 minutes.
Now that our subject supports breaking changes in the version history as partitioned by compatibility groups, we need a way for the consumer to transform messages for the previous compatibility group to the current one. We can achieve this with migration rules using JSONata. Below the
UPGRADE rule allows new consumers to read old messages, while the
DOWNGRADE rule allows old consumers to read new messages. If you plan to upgrade all consumers, you can omit the
DOWNGRADE rule. In each migration rule, we use the JSONata function called
$sift() to remove a field with one name, and then use a JSON property to add a field with another name.
The new rule set, in a file named
order_ruleset2.json, has both domain rules and migration rules and can be registered separately from the schema, as before.
After updating the metadata and rule set, let's try out our migration rule. We'll produce a record using the original schema version with field
"state" and see it transformed by the consumer to conform to the latest schema version with field
First, start the consumer. Note that we specify
use.latest.version=true to tell the consumer that we want the record to conform to the latest version even if it was produced with an older version. As mentioned, we could alternatively specify
use.latest.with.metadata=major_version=2. Either setting will cause the migration rule to be invoked.
In a separate terminal, start the producer. Note that we should pass the ID of the original schema that has a field named
"state" instead of
When the above record is sent to the producer, the following record is received by the consumer, where the JSONata transform has modified the record to have a field named
"status" instead of
The combination of compatibility groups with migration rules for transforming data across those groups is a powerful combination for evolving schemas in arbitrarily complex ways.
To learn more about JSONata, see Understanding JSONata.
The use of data contracts helps to shift-left the responsibility of ensuring data quality, interoperability, and compliance to the upstream component, which is the source of the data. By doing so, a data contract can provide a formal agreement between an upstream component and a downstream component for the structure and semantics of the data in motion. Today Confluent Schema Registry supports the use of data contracts to make streaming more reliable and powerful than ever before.
To learn more, see the documentation on Data Contracts for Schema Registry. If you’d like to get started with Confluent Cloud or Confluent Platform:
Schema Registry is a central repository with a RESTful interface for developers to define standard schemas and register applications to enable compatibility. Schema Registry is available as a software component of Confluent Platform or as a managed component of Confluent Cloud.