Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now
In the article Should You Put Several Event Types in the Same Kafka Topic?, Martin Kleppmann discusses when to combine several event types in the same topic and introduces new subject name strategies for determining how Confluent Schema Registry should be used when producing events to an Apache Kafka® topic.
Schema Registry now supports schema references in Confluent Platform 5.5, and this blog post presents an alternative means of putting several event types in the same topic using schema references, discussing the advantages and disadvantages of this approach.
Apache Kafka, which is an event streaming platform, can also act as a system of record or a datastore, as seen with ksqlDB. Datastores are composed of constructs and constraints. For example, in a relational database, the constructs are tables and rows, while the constraints include primary key constraints and referential integrity constraints. Kafka does not impose constraints on the structure of data, leaving that role to Confluent Schema Registry. Below are some constructs when using both Kafka and Schema Registry:
The following are some constraints that are maintained when using both Kafka and Schema Registry:
As mentioned, the default subject name strategy, TopicNameStrategy, uses the topic name to determine the subject to be used for schema lookups, which helps to enforce subject-topic constraints. The newer subject-name strategies, RecordNameStrategy and TopicRecordNameStrategy, use the record name (along with the topic name for the latter strategy) to determine the subject to be used for schema lookups. Before these newer subject-name strategies were introduced, there were two options for storing multiple event types in the same topic:
The second option of using an Avro union was preferred but still had the following issues:
By using either RecordNameStrategy or TopicRecordNameStrategy, you retain subject-schema constraints, eliminate the need for an Avro union, and gain the ability to evolve types independently. However, you lose subject-topic constraints, as now there is no constraint on the event types that can be stored in the topic, which means the set of event types in the topic can grow unbounded.
Introduced in Confluent Platform 5.5, a schema reference is comprised of:
When registering a schema to Schema Registry, an optional set of references can be specified, such as this Avro union containing reference names:
[ "io.confluent.examples.avro.Customer", "io.confluent.examples.avro.Product", "io.confluent.examples.avro.Payment" ]
When registering this schema to Schema Registry, an array of reference versions is also sent, which might look like the following:
[ { "name": "io.confluent.examples.avro.Customer", "subject": "customer", "version": 1 }, { "name": "io.confluent.examples.avro.Product", "subject": "product", "version": 1 }, { "name": "io.confluent.examples.avro.Order", "subject": "order", "version": 1 } ]
As you can see, the Avro union is no longer unwieldy. It is just a list of event types that will be sent to a topic. The event types can evolve independently, similar to when using RecordNameStrategy and TopicRecordNameStrategy. Plus, you regain subject-topic constraints, which were missing when using the newer subject name strategies.
However, in order to take advantage of these newfound gains, you need to configure your serializers a little differently. This has to do with the fact that when an Avro object is serialized, the schema associated with the object is not the Avro union, but just the event type contained within the union. When the Avro serializer is given the Avro object, it will either try to register the event type as a newer schema version than the union (if auto.register.schemas is true), or try to find the event type in the subject (if auto.register.schemas is false), which will fail. Instead, you want the Avro serializer to use the Avro union for serialization and not the event type. In order to accomplish this, set these two configuration properties on the Avro serializer:
Setting auto.register.schemas to false disables automatic registration of the event type, so that it does not override the union as the latest schema in the subject. Setting use.latest.version to true causes the Avro serializer to look up the latest schema version in the subject (which will be the union) and use that for serialization; otherwise, if set to false, the serializer will look for the event type in the subject and fail to find it.
Now that Confluent Platform supports both JSON Schema and Protobuf, both RecordNameStrategy and TopicRecordNameStrategy can be used with these newer schema formats as well. In the case of JSON Schema, the equivalent of the name of the Avro record is the title of the JSON object. In the case of Protobuf, the equivalent is the name of the Protobuf message.
Also like Avro, instead of using the newer subject-name strategies to combine multiple event types in the same topic, you can use unions. The Avro union from the previous section can also be modeled in JSON Schema, where it is referred to as a "oneof":
{ "oneOf": [ { "$ref": "Customer.schema.json" }, { "$ref": "Product.schema.json" }, { "$ref": "Order.schema.json } ] }
In the above schema, the array of reference versions that would be sent might look like this:
[ { "name": "Customer.schema.json", "subject": "customer", "version": 1 }, { "name": "Product.schema.json", "subject": "product", "version": 1 }, { "name": "Order.schema.json", "subject": "order", "version": 1 } ]
As with Avro, automatic registration of JSON schemas that contain a top-level oneof won’t work, so you should configure the JSON Schema serializer in the same manner as the Avro serializer, with auto.register.schemas set to false and use.latest.version set to true, as described in the previous section.
In Protobuf, top-level oneofs are not permitted, so you need to wrap the oneof in a message:
syntax = "proto3";
package io.confluent.examples.proto;
import "Customer.proto";
import "Product.proto";
import "Order.proto";
message AllTypes {
oneof oneof_type {
Customer customer = 1;
Product product = 2;
Order order = 3;
}
}
Here are the corresponding reference versions that could be sent with the above schema:
[ { "name": "Customer.proto", "subject": "customer", "version": 1 }, { "name": "Product.proto", "subject": "product", "version": 1 }, { "name": "Order.proto", "subject": "order", "version": 1 } ]
One advantage of wrapping the oneof with a message is that automatic registration of the top-level schema will work properly. In the case of Protobuf, all referenced schemas will also be auto registered, recursively.
You can do something similar with Avro by wrapping the union with an Avro record:
{ "type": "record", "namespace": "io.confluent.examples.avro", "name": "AllTypes", "fields": [ { "name": "oneof_type", "type": [ "io.confluent.examples.avro.Customer", "io.confluent.examples.avro.Product", "io.confluent.examples.avro.Order" ] } ] }
This extra level of indirection allows automatic registration of the top-level Avro schema to work properly. However, unlike Protobuf, with Avro, the referenced schemas still need to be registered manually beforehand, as the Avro object does not have the necessary information to allow referenced schemas to be automatically registered.
Wrapping a oneof with a JSON object won’t work with JSON Schema, since a POJO being serialized to JSON doesn’t have the requisite metadata. Instead, optionally annotate the POJO with a @Schema annotation to provide the complete top-level JSON Schema to be used for both automatic registration and serialization. As with Avro, and unlike Protobuf, referenced schemas need to be registered manually beforehand.
Schema references are a means of modularizing a schema and its dependencies. While this article shows how to use them with unions, they can be used more generally to model the following:
As mentioned in the previous section, if you’re using Protobuf, the Protobuf serializer can automatically register the top-level schema and all referenced schemas, recursively, when given a Protobuf object. This is not possible with the Avro and JSON Schema serializers. With those schema formats, you must first manually register the referenced schemas and then the top-level schema. Manual registration can be accomplished with the REST APIs or with the Schema Registry Maven Plugin.
As an example of using the Schema Registry Maven Plugin, below are schemas specified for the subjects named all-types-value, customer, and product in a Maven POM.
<plugin> <groupId>io.confluent</groupId> <artifactId>kafka-schema-registry-maven-plugin</artifactId> <version>${confluent.version}</version> <configuration> <schemaRegistryUrls> <param>http://127.0.0.1:8081</param> </schemaRegistryUrls> <subjects> <all-types-value>src/main/avro/AllTypes.avsc</all-types-value> <customer>src/main/avro/Customer.avsc</customer> <product>src/main/avro/Product.avsc</product> </subjects> <schemaTypes> <all-types-value>AVRO</all-types-value> <customer>AVRO</customer> <product>AVRO</product> </schemaTypes> <references> <all-types-value> <reference> <name>io.confluent.examples.avro.Customer</name> <subject>customer</subject> </reference> <reference> <name>io.confluent.examples.avro.Product</name> <subject>product</subject> </reference> </all-types-value> </references> </configuration> <goals> <goal>register</goal> </goals> </plugin>
Each reference can specify a name, subject, and version. If the version is omitted, as with the example above, and the referenced schema is also being registered at the same time, the referenced schema’s version will be used; otherwise, the latest version of the schema in the subject will be used.
Here is the content of AllTypes.avsc, which is a simple union:
[ "io.confluent.examples.avro.Customer", "io.confluent.examples.avro.Product" ]
Here is Customer.avsc, which contains a Customer record:
{
"type": "record",
"namespace": "io.confluent.examples.avro",
"name": "Customer",
"fields": [
{ "name": "customer_id", "type": "int" },
{ "name": "customer_name", "type": "string" },
{ "name": "customer_email", "type": "string" },
{ "name": "customer_address", "type": "string" }
]
}
And here is Product.avsc, which contains a Product record:
{
"type": "record",
"namespace": "io.confluent.examples.avro",
"name": "Product",
"fields": [
{"name": "product_id", "type": "int"},
{"name": "product_name", "type": "string"},
{"name": "product_price", "type": "double"}
]
}
Next, register the schemas above using the following command:
mvn schema-registry:register
The above command will register referenced schemas before registering the schemas that depend on them. The output of the command will contain the ID of each schema that is registered. You can use the schema ID of the top-level schema with the console producer when producing data.
Next, use the console tools to try it out. First, start the Avro console consumer. Note that you should specify the topic name as all-types since the corresponding subject is all-types-value according to TopicNameStrategy.
./bin/kafka-avro-console-consumer --topic all-types --bootstrap-server localhost:9092
In a separate console, start the Avro console producer. Pass the ID of the top-level schema as the value of value.schema.id.
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic all-types --property value.schema.id={id} --property auto.register=false --property use.latest.version=true
At the same command line as the producer, input the data below, which represent two different event types. The data should be wrapped with a JSON object that specifies the event type. This is how the Avro console producer expects data for unions to be represented in JSON.
{ "io.confluent.examples.avro.Product": { "product_id": 1, "product_name" : "rice", "product_price" : 100.00 } } { "io.confluent.examples.avro.Customer": { "customer_id": 100, "customer_name": "acme", "customer_email": "acme@google.com", "customer_address": "1 Main St" } }
The data will appear at the consumer. Congratulations, you’ve successfully sent two different event types to a topic! And unlike the newer subject name strategies, the union will prevent event types other than Product and Customer from being produced to the same topic, since the producer is configured with the default TopicNameStrategy.
Now there are two modular ways to store several event types in the same topic, both of which allow event types to evolve independently. The first, using the newer subject-name strategies, is straightforward but drops subject-topic constraints. The second, using unions (or oneofs) and schema references, maintains subject-topic constraints but adds further structure and drops automatic registration of schemas in the case of a top-level union or oneof.
If you’re interested in querying topics that combine multiple event types with ksqlDB, the second method, using a union (or oneof) is the only option. By maintaining subject-topic constraints, the method of using a union (or oneof) allows ksqlDB to deal with a bounded set of event types as defined by the union, instead of a potentially unbounded set. Modeling a union (also known as a sum type) by a relational table is a solved problem, and equivalent functionality will most likely land in ksqlDB in the future.
This blog announces the general availability of Confluent Platform 7.8 and its latest key features: Confluent Platform for Apache Flink® (GA), mTLS Identity for RBAC Authorization, and more.
We covered so much at Current 2024, from the 138 breakout sessions, lightning talks, and meetups on the expo floor to what happened on the main stage. If you heard any snippets or saw quotes from the Day 2 keynote, then you already know what I told the room: We are all data streaming engineers now.