[Webinar] Deliver enterprise-grade Apache Kafka® to your customers | Join Now

Schema Registry Clients in Action

作成者 :

There are plenty of materials available out there about Schema Registry. From Confluent alone, if you head to Confluent Developer and search “Schema Registry” you will discover an ever-growing repository of over 100 results including courses, articles, tutorials, blog posts, and more, providing comprehensive resources for enthusiasts and professionals alike.

If you are new to Schema Registry or don't know the difference between schema, schema type, subject, compatibility type, schema ID, and subject version, I would recommend starting with this free course: Schema Registry 101 by Danica Fine.

This article will show the bits and bytes of what happens behind the scenes in Apache Kafka® producer and consumer clients when communicating with the Schema Registry and serializing/deserializing messages.

We will briefly talk about setting data quality rules on the schema itself. They are crucial for maintaining high data integrity, quickly resolving data quality issues, and simplifying schema evolution. They are a part of the data contract concept in Confluent's Stream Governance solution on Confluent Cloud.

Finally, we will go through a quick demo in Python on how to use Confluent Schema Registry without Apache Kafka 😱. But don’t worry, the purpose is just to clarify that a given producer or consumer client independently communicates with at least two separate and different systems:

  1. Apache Kafka (TCP: Kafka Protocol).

  2. Schema Registry (HTTP: Schema Registry REST API).

The code examples shown in this article are all in Python, but Confluent has developed and maintains client libraries for Kafka in the following languages: Java, C/C++, .NET, and Go.

Why are the examples all in Python? Because that is the language I am most comfortable with and, in my opinion, it is easier to understand for readers not used to Java.

Schema Registry in a nutshell

Schema Registry is a component that lives outside and independently of the Kafka cluster. It is used to manage and store the schemas for the data exchanged between Kafka producers and consumers. Just to emphasize this point, Schema Registry does not store nor process messages exchanged between the producers and Kafka, it doesn't even serialize or deserialize messages. Instead, it only handles the metadata (subjects, versions, schemas, schema IDs, among other metadata).

What is a schema then? It is a defined data framework that allows producers and consumers to understand the type and structure of the data being exchanged. It is a blueprint that describes how the data should be formatted and what type of information it should contain. Essentially, it is the data contract that binds together the loosely coupled producers and consumers.

Schema Registry plays a crucial role in ensuring the quality, compatibility, and consistency of data formats within a Kafka ecosystem, especially in scenarios where different applications or services that do not communicate directly with each other may be producing and consuming data.

Schema Registry clients

The diagram below shows the four main Schema Registry clients:

  • Schema manager: Although a schema can be registered/managed by the producer clients themselves, it is good practice to have that done as part of a CI/CD pipeline, such as by using the Schema Registry Maven plugin. Using this method, the producer and consumer clients would have read-only access to the Schema Registry and hence “abide” by the data contract (schema) as defined, which will help ensure data quality and consistency.

  • Kafka cluster: Kafka brokers communicate with Schema Registry for validation through a process known as broker-side schema validation (enabled on a per-topic basis on Confluent Cloud and on dedicated clusters). This process allows the broker to verify that data produced to a Kafka topic is using a valid schema ID in Schema Registry that is registered according to the subject naming strategy. No other validation is performed, like schema structure, serialized data, etc., only the schema and subject.

  • Producer client: A producer client needs to communicate with both Schema Registry and Kafka clusters. When serializing the message, the producer will get the schema ID from the Schema Registry, serialize the message as per schema, and then produce the binary data to the Kafka cluster. The producer can also get a copy of the entire schema and its ID without the need to have that stored locally (more on that later).

  • Consumer client: A consumer client also communicates with both Schema Registry and Kafka clusters. However, it will first get the binary data from the Kafka cluster (serialized message), extract the schema ID, and based on it, get its corresponding schema from Schema Registry and only then deserialize the message.

Serializers

We previously mentioned serialization and deserialization, but what are they exactly?

Simply put, serialization is the process of writing the state of an object into a byte stream/array, and deserialization is the same, but the other way around. For example:

  • If we encode the string “my car” using the encoder UTF-16 little endian we will get the following byte array (represented in hexadecimal): \x6d\x00\x79\x00\x20\x00\x63\x00\x61\x00\x72\x00

  • The deserialization would have that converted back to the original string

  • As you have already noticed, that only works if the serializer and deserializer use the exact same encoding mechanism

  • By trying to decode that using UTF-16 big endian instead, we get the string 洀礀\u2000挀愀爀

  • Dude, where’s “my car”?

Apache Kafka has several native serializers, to name a few of them:

  • org.apache.kafka.common.serialization.StringSerializer

  • org.apache.kafka.common.serialization.IntegerSerializer

  • org.apache.kafka.common.serialization.DoubleSerializer

  • org.apache.kafka.common.serialization.BytesSerializer

You could even build your own serializer (don’t forget to build the corresponding deserializer!).

When using Confluent’s library, you have the following additional serializers and deserializers:

  • io.confluent.kafka.serializers.KafkaAvroSerializer

  • io.confluent.kafka.serializers.KafkaAvroDeserializer

  • io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer

  • io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer

  • io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer

  • io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer

When using the Confluent serializers, each serialized message will be prefixed with five bytes. Here is how data is mapped to low-level bytes:

Bytes

Area

Description

0

Magic Byte

Confluent serialization format version number. Currently always equal to 0 (zero).

1 - 4

Schema ID

Schema ID (unsigned integer, 4 bytes), as defined by the Schema Registry.

5 - …

Data

Serialized message for the specified schema format (for example, binary encoding for JSON, Avro, or Protocol Buffers).

There might be other serializer mechanisms that can have a different low-level bytes mapping. That is to say, serializing with one mechanism and deserializing with another might throw an exception or yield “Dude, where’s my car?”.

The reason to have the schema IDs added to each message is that Kafka messages do not want to be written with the entire Avro schema, otherwise, it would be a huge waste of space, and as they say, “space is money,” oh wait, isn't time? Well, you get the picture. Instead, Kafka messages are prefixed with the corresponding schema ID. The producers writing the messages and the consumers reading the messages must use the same Schema Registry to get the same mapping between a schema ID and the actual schema. If using different schema registries they should be in sync using tools such as Schema Linking.

You might be wondering: Hey, but in Confluent’s Schema Registry don’t we also have the schema version? Where does that fit in? Well, the subject version is an important piece of information so one can keep track of the evolution and history of the schema and know the latest version/ID of a given subject. So, when a consumer is deserializing a given message all that is needed is just the schema ID.

Serializing a message using AVRO

Confluent Schema Registry supports Avro, JSON Schema, and Protobuf serializers and deserializers. However, we will go through an example using the Avro framework. Schema Registry works in exactly the same way for any schema type. 

For example, let’s say that the following message needs to be Avro serialized:

{
   "timestamp": 1705145844562,
   "user_id": "User_50",
   "first_name": "Carmelita",
   "last_name": "Wood",
   "gender": "Female",
   "random": 3122
}

The Avro schema, in that case, was registered as shown below and the Schema Registry has, for example, set the schema ID 100114 to it:

{
  "namespace": "pydatagen",
  "name": "demo",
  "type": "record",
  "fields":
    [
        {
            "name": "timestamp",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
                }
            }
        },
        {
            "name": "user_id",
            "type": "string"
        },
        {
            "name": "first_name",
            "type": "string"
        },
        {
            "name": "last_name",
            "type": "string"
        },
        {
            "name": "gender",
            "type": "string"
        },
        {
            "name": "random",
            "type": "int"
        }
    ]
}

The serialized message will look like this (in binary):

\x00\x00\x01\x87\x12\xa4\x8d\xe6\xa9\xa0c\x0eUser_50\x12Carmelita\x08Wood\x0cFemale\xe40

Wow! So if we had string serialized the original message we would have 134 bytes, but the Avro serialized one has only 43 bytes? Was my message compressed? Technically no, it was just compacted. To get it back to what it was, as you realized, we will need to know the schema. Without it, we could kind of figure out the values, but not their corresponding keys/structure.

Oh! By the way, what does a Lancaster Bomber have to do with Avro? I will let you find that out. You will get a bonus point if you answer that one.

Coming back to the serialized message, let’s analyze the first five bytes of it:

  • The first \x00 (magic byte): 0, unfortunately not as magic as my favorite basketball legends Earvin “Magic” Johnson Jr. and Maria Paula Gonçalves da Silva.

  • \x00\x01\x87\x12 (schema ID): That specific one equates to 100114. See below how to get the schema ID using Python:

    >>> int.from_bytes(b"\x00\x01\x87\x12", "big")
    100114
    

The remaining bytes (\xa4 onward) are the actual payload but Avro serialized.

Producer client in action

A producer is a client or application that publishes messages, also known as records, to one or more Kafka topics. These messages are then consumed by one or more consumers that are subscribed to the topic(s).

A producer, when serializing the message using schemas, will communicate with both the Schema Registry cluster (HTTP) and the Kafka cluster (TCP).

That means each producer will have two client instances, one to each cluster. In Python and using Avro it looks like this (see the complete example at confluent-kafka-python/avro_producer.py, the code extract below only shows the most important parts for the purpose of this exercise).

Kafka client instance

from confluent_kafka import Producer
...
producer_conf = {"bootstrap.servers": "localhost:9092"}
producer = Producer(producer_conf)
Note:
There are several producer configuration options, for the full list of options please refer to Kafka Producer Configurations and confluent-kafka-python's producer.

Schema Registry client instance

from confluent_kafka.schema_registry import SchemaRegistryClient
...
schema_registry_conf = {
   "url": "http://localhost:8081"
}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
Note:
For the list of Schema Registry configuration options please refer to confluent-kafka-python's SchemaRegistryClient.

That will create the client instance, however, no communication with Schema Registry will occur just yet.

Serializer object

The next step is to create the serializer object:

from confluent_kafka.schema_registry.avro import AvroSerializer
...
schema_str = "..."  # as per example above, corresponding to schema ID 100114
avro_serializer = AvroSerializer(
   schema_registry_client,
   schema_str,
   conf={
      "auto.register.schemas": True,
      "normalize.schemas": False,
      "use.latest.version": False
   }
)

The serializer object is used to output the Avro binary encoded data with Confluent Schema Registry framing. The configuration options are listed in the table below:

Property Name

Type

Description

auto.register.schemas

bool

If True, automatically register the configured schema with Confluent Schema Registry if it has not previously been associated with the relevant subject (determined via subject.name.strategy).

Defaults to True.

normalize.schemas

bool

Whether to normalize schemas, which will transform schemas to have a consistent format, including ordering properties and references.

Defaults to False.

use.latest.version

bool

Whether to use the latest subject version for serialization.

Warning: There is no check that the latest schema is backwards compatible with the object being serialized.

Defaults to False.

subject.name.strategy

callable

Defines how Schema Registry subject names are constructed. Standard naming strategies are defined in the confluent_kafka.schema_registry namespace.

Defaults to topic_subject_name_strategy.

Now our producer client is ready to serialize the messages and have them produced to the Kafka cluster:

from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
...
topic_name = "demo_users"
message = "{...}"  # as per example above (134 bytes JSON)
producer.produce(
   topic=topic_name,
   key=StringSerializer("your_key_here"),  # or simply None if you don’t need a key
   value=avro_serializer(
      message,
      SerializationContext(
         topic_name,
         MessageField.VALUE,
      )
   )
)

Now the serializer object (avro_serializer) will send a POST request to Schema Registry to try to register the schema in case it doesn’t exist (as auto.register.schemas was set to True) and get the corresponding schema ID (see example below). That is not a recommended configuration in production environments, ideally schemas should be managed via a CI/CD pipeline and the producer clients set with read-only access to Schema Registry (role DeveloperRead).

POST http://localhost:8081/subjects/demo_users-value/versions?normalize=False
Body {"schema": "{\"namespace\": \"pydatagen\", \"name\": \"demo\", \"type\": \"record\", \"fields\": [{\"name\": \"timestamp\", \"type\":{\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}, {\"name\": \"user_id\", \"type\": {\"type\": \"string\"}}, {\"name\": \"first_name\", \"type\": {\"type\": \"string\"}}, {\"name\": \"last_name\", \"type\": {\"type\": \"string\"}}, {\"name\": \"gender\", \"type\": {\"type\": \"string\"}}, {\"name\": \"random\", \"type\": {\"type\": \"int\"}}]}"}

The following was the response:

200 OK
Content {"id": 100114}

Having the schema ID, all that is left to do is to serialize the message and prefix the magic byte (0) and schema ID (100114).

So the message:

{
   "timestamp": 1705145844562,
   "user_id": "User_50",
   "first_name": "Carmelita",
   "last_name": "Wood",
   "gender": "Female",
   "random": 3122
}

Gets serialized as:

\x00\x00\x01\x87\x12\xa4\x8d\xe6\xa9\xa0c\x0eUser_50\x12Carmelita\x08Wood\x0cFemale\xe40

The next time that same producer client instance needs to serialize another message it will not submit a new POST request to the Schema Registry as both the schema and schema ID  were cached by the client. A new HTTP request would only happen if the producer client instance is restarted, or if it is serializing a message to another topic and/or subject, or if the schema changes.

One important thing to notice is that the serializer object had the configuration use.latest.version set as False. If we had instead set it to True (auto.register.schemas would need to be set to False) then it would submit a GET instead of a POST to fetch the latest schema available for the subject demo_users-value. By doing so, rather of deriving a schema for the string schema (schema_str) passed when creating the serializer object (avro_serializer), it would use the latest version of the schema in the subject, for example:

GET http://localhost:8081/subjects/demo_users-value/versions/latest

Yielding the response:

200 OK
Content {
   "subject": "demo_users-value",
   "version": 1,
   "id": 100114,
   "schema": "{\"type\": \"record\", \"name\": \"demo\", \"namespace\": \"pydatagen\", \"fields\" :[{\"name\": \"timestamp\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}, {\"name\": \"user_id\", \"type\": \"string\"}, {\"name\": \"first_name\", \"type\": \"string\"}, {\"name\": \"last_name\", \"type\": \"string\"}, {\"name\": \"gender\", \"type\": \"string\"}, {\"name\": \"random\", \"type\":\"int\"}]}"
}

The schema and schema ID would also be equally cached by the producer client instance.