Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now
There are plenty of materials available out there about Schema Registry. From Confluent alone, if you head to Confluent Developer and search “Schema Registry” you will discover an ever-growing repository of over 100 results including courses, articles, tutorials, blog posts, and more, providing comprehensive resources for enthusiasts and professionals alike.
If you are new to Schema Registry or don't know the difference between schema, schema type, subject, compatibility type, schema ID, and subject version, I would recommend starting with this free course: Schema Registry 101 by Danica Fine.
This article will show the bits and bytes of what happens behind the scenes in Apache Kafka® producer and consumer clients when communicating with the Schema Registry and serializing/deserializing messages.
We will briefly talk about setting data quality rules on the schema itself. They are crucial for maintaining high data integrity, quickly resolving data quality issues, and simplifying schema evolution. They are a part of the data contract concept in Confluent's Stream Governance solution on Confluent Cloud.
Finally, we will go through a quick demo in Python on how to use Confluent Schema Registry without Apache Kafka 😱. But don’t worry, the purpose is just to clarify that a given producer or consumer client independently communicates with at least two separate and different systems:
Apache Kafka (TCP: Kafka Protocol).
Schema Registry (HTTP: Schema Registry REST API).
The code examples shown in this article are all in Python, but Confluent has developed and maintains client libraries for Kafka in the following languages: Java, C/C++, .NET, and Go.
Why are the examples all in Python? Because that is the language I am most comfortable with and, in my opinion, it is easier to understand for readers not used to Java.
Schema Registry is a component that lives outside and independently of the Kafka cluster. It is used to manage and store the schemas for the data exchanged between Kafka producers and consumers. Just to emphasize this point, Schema Registry does not store nor process messages exchanged between the producers and Kafka, it doesn't even serialize or deserialize messages. Instead, it only handles the metadata (subjects, versions, schemas, schema IDs, among other metadata).
What is a schema then? It is a defined data framework that allows producers and consumers to understand the type and structure of the data being exchanged. It is a blueprint that describes how the data should be formatted and what type of information it should contain. Essentially, it is the data contract that binds together the loosely coupled producers and consumers.
Schema Registry plays a crucial role in ensuring the quality, compatibility, and consistency of data formats within a Kafka ecosystem, especially in scenarios where different applications or services that do not communicate directly with each other may be producing and consuming data.
The diagram below shows the four main Schema Registry clients:
Schema manager: Although a schema can be registered/managed by the producer clients themselves, it is good practice to have that done as part of a CI/CD pipeline, such as by using the Schema Registry Maven plugin. Using this method, the producer and consumer clients would have read-only access to the Schema Registry and hence “abide” by the data contract (schema) as defined, which will help ensure data quality and consistency.
Kafka cluster: Kafka brokers communicate with Schema Registry for validation through a process known as broker-side schema validation (enabled on a per-topic basis on Confluent Cloud and on dedicated clusters). This process allows the broker to verify that data produced to a Kafka topic is using a valid schema ID in Schema Registry that is registered according to the subject naming strategy. No other validation is performed, like schema structure, serialized data, etc., only the schema and subject.
Producer client: A producer client needs to communicate with both Schema Registry and Kafka clusters. When serializing the message, the producer will get the schema ID from the Schema Registry, serialize the message as per schema, and then produce the binary data to the Kafka cluster. The producer can also get a copy of the entire schema and its ID without the need to have that stored locally (more on that later).
Consumer client: A consumer client also communicates with both Schema Registry and Kafka clusters. However, it will first get the binary data from the Kafka cluster (serialized message), extract the schema ID, and based on it, get its corresponding schema from Schema Registry and only then deserialize the message.
We previously mentioned serialization and deserialization, but what are they exactly?
Simply put, serialization is the process of writing the state of an object into a byte stream/array, and deserialization is the same, but the other way around. For example:
If we encode the string “my car
” using the encoder UTF-16 little endian we will get the following byte array (represented in hexadecimal): \x6d\x00\x79\x00\x20\x00\x63\x00\x61\x00\x72\x00
The deserialization would have that converted back to the original string
As you have already noticed, that only works if the serializer and deserializer use the exact same encoding mechanism
By trying to decode that using UTF-16 big endian instead, we get the string 洀礀\u2000挀愀爀
Dude, where’s “my car
”?
Apache Kafka has several native serializers, to name a few of them:
org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.IntegerSerializer
org.apache.kafka.common.serialization.DoubleSerializer
org.apache.kafka.common.serialization.BytesSerializer
You could even build your own serializer (don’t forget to build the corresponding deserializer!).
When using Confluent’s library, you have the following additional serializers and deserializers:
io.confluent.kafka.serializers.KafkaAvroSerializer
io.confluent.kafka.serializers.KafkaAvroDeserializer
io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer
When using the Confluent serializers, each serialized message will be prefixed with five bytes. Here is how data is mapped to low-level bytes:
There might be other serializer mechanisms that can have a different low-level bytes mapping. That is to say, serializing with one mechanism and deserializing with another might throw an exception or yield “Dude, where’s my car?”.
The reason to have the schema IDs added to each message is that Kafka messages do not want to be written with the entire Avro schema, otherwise, it would be a huge waste of space, and as they say, “space is money,” oh wait, isn't time? Well, you get the picture. Instead, Kafka messages are prefixed with the corresponding schema ID. The producers writing the messages and the consumers reading the messages must use the same Schema Registry to get the same mapping between a schema ID and the actual schema. If using different schema registries they should be in sync using tools such as Schema Linking.
You might be wondering: Hey, but in Confluent’s Schema Registry don’t we also have the schema version? Where does that fit in? Well, the subject version is an important piece of information so one can keep track of the evolution and history of the schema and know the latest version/ID of a given subject. So, when a consumer is deserializing a given message all that is needed is just the schema ID.
Confluent Schema Registry supports Avro, JSON Schema, and Protobuf serializers and deserializers. However, we will go through an example using the Avro framework. Schema Registry works in exactly the same way for any schema type.
For example, let’s say that the following message needs to be Avro serialized:
The Avro schema, in that case, was registered as shown below and the Schema Registry has, for example, set the schema ID 100114
to it:
The serialized message will look like this (in binary):
Wow! So if we had string serialized the original message we would have 134 bytes, but the Avro serialized one has only 43 bytes? Was my message compressed? Technically no, it was just compacted. To get it back to what it was, as you realized, we will need to know the schema. Without it, we could kind of figure out the values, but not their corresponding keys/structure.
Oh! By the way, what does a Lancaster Bomber have to do with Avro? I will let you find that out. You will get a bonus point if you answer that one.
Coming back to the serialized message, let’s analyze the first five bytes of it:
The first \x00
(magic byte): 0, unfortunately not as magic as my favorite basketball legends Earvin “Magic” Johnson Jr. and Maria Paula Gonçalves da Silva.
\x00\x01\x87\x12
(schema ID): That specific one equates to 100114
. See below how to get the schema ID using Python:
The remaining bytes (\xa4
onward) are the actual payload but Avro serialized.
A producer is a client or application that publishes messages, also known as records, to one or more Kafka topics. These messages are then consumed by one or more consumers that are subscribed to the topic(s).
A producer, when serializing the message using schemas, will communicate with both the Schema Registry cluster (HTTP) and the Kafka cluster (TCP).
That means each producer will have two client instances, one to each cluster. In Python and using Avro it looks like this (see the complete example at confluent-kafka-python/avro_producer.py, the code extract below only shows the most important parts for the purpose of this exercise).
That will create the client instance, however, no communication with Schema Registry will occur just yet.
The next step is to create the serializer object:
The serializer object is used to output the Avro binary encoded data with Confluent Schema Registry framing. The configuration options are listed in the table below:
Property Name | Type | Description |
| bool | If True, automatically register the configured schema with Confluent Schema Registry if it has not previously been associated with the relevant subject (determined via subject.name.strategy).
Defaults to |
| bool | Whether to normalize schemas, which will transform schemas to have a consistent format, including ordering properties and references.
Defaults to |
| bool | Whether to use the latest subject version for serialization.
Warning: There is no check that the latest schema is backwards compatible with the object being serialized.
Defaults to |
| callable | Defines how Schema Registry subject names are constructed. Standard naming strategies are defined in the confluent_kafka.schema_registry namespace.
Defaults to |
Now our producer client is ready to serialize the messages and have them produced to the Kafka cluster:
Now the serializer object (avro_serializer
) will send a POST request to Schema Registry to try to register the schema in case it doesn’t exist (as auto.register.schemas
was set to True
) and get the corresponding schema ID (see example below). That is not a recommended configuration in production environments, ideally schemas should be managed via a CI/CD pipeline and the producer clients set with read-only access to Schema Registry (role DeveloperRead).
The following was the response:
Having the schema ID, all that is left to do is to serialize the message and prefix the magic byte (0
) and schema ID (100114
).
So the message:
Gets serialized as:
The next time that same producer client instance needs to serialize another message it will not submit a new POST request to the Schema Registry as both the schema and schema ID were cached by the client. A new HTTP request would only happen if the producer client instance is restarted, or if it is serializing a message to another topic and/or subject, or if the schema changes.
One important thing to notice is that the serializer object had the configuration use.latest.version
set as False
. If we had instead set it to True
(auto.register.schemas would need to be set to False) then it would submit a GET instead of a POST to fetch the latest schema available for the subject demo_users-value
. By doing so, rather of deriving a schema for the string schema (schema_str
) passed when creating the serializer object (avro_serializer
), it would use the latest version of the schema in the subject, for example:
Yielding the response:
The schema and schema ID would also be equally cached by the producer client instance.
A consumer is a component responsible for subscribing to topics and processing the stream of messages published to those topics by producers. Consumers continuously poll Kafka brokers for new messages in the subscribed topics, maintaining an offset to keep track of their progress in message processing. Multiple consumer instances can form a consumer group, enabling parallelization of message consumption.
Just like its sibling producer (that it will never get to meet face to face, unfortunately), a consumer, when deserializing the message using schemas, will communicate with both clusters, but with the Kafka cluster first.
Consumers will also have two client instances, one for each cluster. In Python and using Avro it looks like this (see the complete example on GitHub):
This instance has no difference from the producer instance:
The next step is to create the deserializer object:
The last step is to consume the messages from the Kafka broker and have them deserialized accordingly:
When calling the deserializer object (avro_deserializer
) it will submit a GET
request to Schema Registry to get the schema based on the schema ID (bytes 1 to 4 of the serialized message), in our example 100114
:
Obtaining the response:
So, the message gets deserialized as:
Boom! That was the original message generated by the producer.
So, let me get this right: The consumer didn’t need to have the schema saved locally or hard coded in its application? First of all, let’s join the StHCMfAiP (Stop the Hard Code Movement for Applications in Production!) and second, ideally yes, there is no need to have the schema saved locally with the consumer application as the message can be deserialized just based on the schema ID. However, there might be cases where the consumer wants to use a different version of the schema (if compatible) or just wants to rename a given field of the payload, for example:
Notice the new argument on the deserializer object (schema_str
, optional) where the field random is being renamed to random_new
using aliases
. The deserialized message would look like this:
Just to avoid angry mobs, no StHCMfAiP rules were broken here as that was just an example, never to be used in production 🙃. On a serious note, to ensure adaptability and maintain loose coupling in system development, it is imperative that any alternative schema utilized by a consumer be stored in Schema Registry. Relying on locally saved schemas within individual consumers risks introducing hard-coded dependencies and undermines the flexibility that a centralized Schema Registry provides.
Data quality rules on the schema are used to prove the value of a single field based on a boolean predicate. They can trigger actions on failure or success of the rule, thus providing a mechanism to enforce the semantics of the data and further constrain the values of those fields. This helps in catching and fixing issues at the point of ingestion, minimizing the chances of poor quality data working its way downstream.
These rules can be attached to schemas in Confluent Schema Registry, which then acts as a gatekeeper of schemas as well as data quality and controls. This is a significant advancement in Confluent Schema Registry, extending its functionality beyond the grammar-based nature of schema languages such as Avro and Protobuf.
Moreover, data quality rules can be used to create domain validation and Event-Condition-Action (ECA) rules, transformation rules, and complex schema migration rules. These rules help in validating and constraining the values of fields, changing the value of a specific field or an entire message, and evolving a schema in an incompatible manner by applying transformations when consuming from a topic.
This article, Connect, Process, and Share Trusted Data Faster Than Ever: Kora Engine, Data Quality Rules, and More, written by David Araujo and Bharath Venkat, is an excellent resource for delving into the subject in more depth.
As previously mentioned, Schema Registry and Apache Kafka clusters are two different and separate systems. Although there is no mandate to use Apache Kafka with Schema Registry, it is a highly recommended best practice. It is essential for maintaining data compatibility and fostering interoperability within distributed systems. This not only facilitates real-time data integration but also enhances system resilience, as changes can be efficiently tracked, versioned, and validated, ultimately promoting agility and scalability in Kafka-based architectures.
To demonstrate that Schema Registry is indeed a separate system, we have a demo that is available at this GitHub repository. Basically, the script avro_ser.py
generates random messages, has them Avro serialized, and saves them to local files. No message (other than the Schema Registry metadata) will be stored in Apache Kafka.
You can then have them deserialized using the script avro_deser.py
.
It will require Confluent Schema Registry, but in that repository you can run it with Docker Compose (see file docker-compose.yml).
To execute a simple demo just execute the shell script ./demo.sh
. Below is an example of the output you might get:
The serialized messages will be saved into the folder ./data
:
weather-avro-<EPOCH_time>.bin
: Serialized message
weather-avro-<EPOCH_time>.json
: Original message
To deserialize the messages run the script:
For example:
After that head to Confluent Control Center (http://localhost:9021) and see the messages in an internal topic called _schemas
. You will find two messages, one to each schema created (Avro and Protobuf). If you add more schemas or change the existing ones additional messages will be published to that topic.
Integrating Schema Registry with Apache Kafka offers a powerful advantage to your data streaming ecosystem. This centralized schema management system ensures seamless data evolution, compatibility, and governance across distributed components. By enforcing versioning and facilitating efficient error detection, Schema Registry enhances collaboration, enabling teams to work independently on evolving schemas while maintaining backward and forward compatibility. The result is a robust and agile architecture that optimizes storage, network usage, and data integrity, making Schema Registry an indispensable tool for ensuring the reliability and scalability of your Kafka-based applications.
Confluent Schema Registry supports Avro, JSON Schema, and Protobuf serializers and deserializers. It is a key component of Stream Governance, available in the Essentials and Advanced Stream Governance packages in Confluent Cloud, and is integrated with the rest of the Confluent ecosystem.
Schema Registry in Confluent Cloud is a fully managed service with a per-environment, hosted Schema Registry. It allows you to compare schema versions, change subject level compatibility mode of a schema, search for schemas and fields, tag schemas and fields, and manage schemas for an environment. It also provides access control (RBAC) for Confluent Cloud Schema Registry.
This article explained how producer/consumer applications work symbiotically with Schema Registry and Apache Kafka clusters and how they cache information in memory locally to help improve overall efficiency. Where each application will have a different client connection to these systems. Also, not necessarily producer/consumer applications need to have schema saved locally with the application, as a matter of fact, it doesn’t even need to know about it. As long as the payload matches the schema the applications don’t need to care about that as the Confluent client libraries will take care of that for you when serializing and deserializing the messages.
We also briefly touched upon data quality rules and how they can significantly enhance the quality of data streams that power your business, making them a valuable tool in any data-driven organization.
Have a go yourself, play with the Confluent Schema Registry, and don’t hesitate to reach out to us should you want to take your data streaming applications, and business, to the next level. You can get started with Schema Registry with the steps below:
Sign up for Confluent Cloud and use the Confluent Cloud Schema Registry Tutorial to get started.
Download Confluent Platform and use the Confluent Platform Schema Registry Tutorial to get started.
This blog announces the general availability of Confluent Platform 7.8 and its latest key features: Confluent Platform for Apache Flink® (GA), mTLS Identity for RBAC Authorization, and more.
We covered so much at Current 2024, from the 138 breakout sessions, lightning talks, and meetups on the expo floor to what happened on the main stage. If you heard any snippets or saw quotes from the Day 2 keynote, then you already know what I told the room: We are all data streaming engineers now.