Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now
As your Apache Kafka® deployment starts to grow, the benefits of using a schema registry quickly become compelling. Confluent Schema Registry, which is included in the Confluent Platform, enables you to achieve strong decoupling of the systems you integrate via Kafka, in turn allowing your teams to be more agile and create applications that are more robust to change.
Kafka producers and consumers are already decoupled in the sense that they do not communicate with one another directly; instead, information transfer happens via Kafka topics. But they are coupled in the sense that consumers need to know what the data they are reading represents in order to make sense of it—but this is something that is controlled by the producer! In the same way that Kafka acts as an intermediary for your data, Schema Registry can act as an intermediary for the types of data that you publish to Kafka.
In any non-trivial system, requirements change over time and the schemas needed to satisfy them change accordingly; that is, schemas evolve. This evolution makes this form of coupling even stronger. Not only do consumers need to understand the type of data they are reading today, they also depend on any changes that might be made to this type over time. You can reduce this coupling by defining constraints on the way schemas are allowed to evolve upfront and manage these constraints using Schema Registry as well. Producers agree to adhere to the published evolution rules, and consumers are written to be able to handle any schema that is consistent with them. The downside of this, of course, is that it limits flexibility and adds some overhead due to additional formality. However, the payoff in terms of reduced coupling is usually much greater.
Confluent Schema Registry is built around the Apache Avro™ serialization format. One of the reasons Avro was selected as the recommended format for use with Confluent Platform is that it has flexible, well-defined rules around schema evolution. Jay Kreps expands on this and other appealing aspects of Avro in his article Why Avro for Kafka Data?
Side Note: Avro is often compared with Protobuf and Thrift, two other commonly used serialization formats. A key way in which Avro differs from these formats is that it requires that the schema used to serialize data be available when parsing it. Protobuf and Thrift do not—the serialized data includes enough meta-information to allow it to be parsed without a schema. But this meta-information is only a partial description of the data. For example, it doesn’t include field names or doc strings, so it doesn’t replace the role of a schema registry. | ||
In this blog post, we’ll walk through a practical example of how to use Avro and Schema Registry from C#/.NET. We’ll assume that you have a Kafka cluster and Schema Registry up and running already (for more information, refer to the Confluent Platform Quick Start). We’ll also assume that you’re set up to build and run C# applications, either targeting the .NET Framework or .NET Core on your platform of choice.
The .NET Avro serializer and deserializer allow you to work with Avro data in one of two ways:
We’ll demonstrate how to use both, starting with GenericRecord, then the specific compiler-generated classes. In general, you’ll find working with the specific classes much simpler, and you should prefer this where possible. Use GenericRecord in scenarios where you need to work dynamically with data of any type.
First, let’s define a schema for a simple log message type. We’ll specify this over two files: one for the LogMessage itself, and the other for the LogLevel enumeration that it depends on:
{ "namespace": "MessageTypes", "type": "enum", "doc": "Enumerates the set of allowable log levels.", "name": "LogLevel", "symbols": ["None", "Verbose", "Info", "Warning", "Error"] }{ "namespace": "MessageTypes", "type": "record", "doc": "A simple log message type as used by this blog post.", "name": "LogMessage", "fields": [ { "name": "IP", "type": "string" }, { "name": "Message", "type": "string" }, { "name": "Severity", "type": MessageTypes.LogLevel } ] }
The Avro format allows you to specify documentation along with your schemas using the doc attribute. You should make good use of this feature. Schemas should be self-contained—someone using your schema should not need to consult any other source of information. Pro tip: you can auto-generate user-readable documentation for your schemas using the Avrodoc tool.
Below is an example of how to produce a LogMessage message to Kafka conforming to the above schema. (Note: The complete source code for all examples is available on GitHub.)
var config = new Dictionary<string, object> { { "bootstrap.servers", bootstrapServers }, { "schema.registry.url", schemaRegistryUrl } };using (var producer = new Producer<Null, GenericRecord>( config, null, new AvroSerializer<GenericRecord>())) { var logLevelSchema = (EnumSchema)Schema.Parse( File.ReadAllText("LogLevel.avsc")); var logMessageSchema = (RecordSchema)Schema .Parse(File.ReadAllText("LogMessage.V1.avsc") .Replace( "MessageTypes.LogLevel", File.ReadAllText("LogLevel.avsc")));
var record = new GenericRecord(logMessageSchema); record.Add("IP", "127.0.0.1"); record.Add("Message", "a test log message"); record.Add("Severity", new GenericEnum(logLevelSchema, "Error")); producer.ProduceAsync("log-messages", null, record) .ContinueWith(dr => Console.WriteLine(dr.Result.Error ? $"error producing message: {dr.Result.Error.Reason}" : $"produced to: {dr.Result.TopicPartitionOffset}"));
producer.Flush(TimeSpan.FromSeconds(30)); }
There are a few things worth noting:
You can verify that the schema was added to Schema Registry as follows:
curl http://localhost:8081/subjects> ["log-messages-value"]
Notice that the schema was registered under the name log-messages-value, which is the topic name concatenated with -value. This name is referred to as a subject. Corresponding to any given topic, there are potentially two subjects registered in Schema Registry—one corresponding to the message value and one corresponding to the message key (i.e., the topic name appended with -key).
You can interact with Schema Registry further using its REST API. Something else you might want to do is set the subject compatibility to FULL (by default, it’s BACKWARD):
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data '{"compatibility": "FULL"}' \ http://localhost:8081/config/log-messages-value
Full compatibility enforces that any schema written to this subject is both forwards and backwards compatible with older schemas:
From the perspective of decoupling producers and consumers, forwards compatibility is very desirable because it ensures consumers will be able to read new versions of data without being updated. Backwards compatibility is important for data destined for archival storage because it ensures that the latest schema can be used to read all historical data.
It’s common to want to evolve a schema by adding another field. Conveniently, the new schema will be both forwards and backwards compatible with the old one, provided you specify a default value for the added fields. For a full set of forwards/backwards compatibility rules, refer to the documentation on data serialization and evolution.
OK, let’s add a tags field to our original schema, and this time we’ll define the LogLevel enum inline:
{ "namespace": "MessageTypes", "type": "record", "doc": "A simple log message type as used by this blog post.", "name": "LogMessage", "fields": [ { "name": "IP", "type": "string" }, { "name": "Message", "type": "string" }, { "name": "Tags", "type": { "type": "map", "values": "string"}, "default": {}}, { "name": "Severity", "type": { "namespace": "MessageTypes", "type": "enum", "doc": "Enumerates the set of allowable log levels.", "name": "LogLevel", "symbols": ["None", "Verbose", "Info", "Warning", "Error"]}} ] }
Instead of making use of the GenericRecord class, let’s now use the avrogen tool to generate a C# class corresponding to the new schema. avrogen is available as a .NET Core Global Tool (requires .NET Core 2.1 or above). It can be installed as follows:
dotnet tool install -g Confluent.Apache.Avro.AvroGen
Now, you can run the tool in your project directory:
avrogen -s LogMessage.V2.avsc .
And then use the generated class directly with AvroSerializer:
var config = new Dictionary<string, object> { { "bootstrap.servers", bootstrapServers }, { "schema.registry.url", schemaRegistryUrl } };using (var producer = new Producer<Null, MessageTypes.LogMessage>( config, null, new AvroSerializer<MessageTypes.LogMessage>())) { producer.ProduceAsync( "log-messages", null, new MessageTypes.LogMessage { IP = "192.168.0.1", Message = "a test message 2", Severity = MessageTypes.LogLevel.Info, Tags = new Dictionary<string, string> { { "location", "CA" } } }); producer.Flush(TimeSpan.FromSeconds(30)); }
Or with AvroDeserializer:
var consumerConfig = new Dictionary<string, object> { { "group.id", Guid.NewGuid().ToString() }, { "bootstrap.servers", bootstrapServers }, { "schema.registry.url", schemaRegistryUrl }, { "auto.offset.reset", "beginning" } };using (var consumer = new Consumer<Null, MessageTypes.LogMessage>( consumerConfig, null, new AvroDeserializer<MessageTypes.LogMessage>())) { consumer.OnConsumeError += (_, error) => Console.WriteLine($"an error occured: {error.Error.Reason}");
consumer.OnMessage += (_, msg) => Console.WriteLine($"{msg.Timestamp.UtcDateTime.ToString("yyyy-MM-dd HH:mm:ss")}: [{msg.Value.Severity}] {msg.Value.Message}");
consumer.Subscribe("log-messages");
while (true) { consumer.Poll(TimeSpan.FromSeconds(1)); } }
When used in the context of the consumer, the MessageTypes.LogMessage schema is referred to as the reader schema. Messages read by the consumer may contain data that was serialized using this schema or potentially any other schema (referred to as the writer schema). In the event that the reader and writer schemas are different, Avro schema resolution rules will be used to reconcile the difference. If the schemas are incompatible, the consumed message will be delivered via the OnConsumeError event rather than the OnMessage event.
With the above examples, we’ve shown how straightforward it is to use Confluent Schema Registry and Avro serialized data with your .NET applications. People just starting out with Kafka often choose to work with a simple text-based serialization format such as JSON, but as your Kafka deployment starts to grow in complexity, there is a lot to be gained from using a data format that provides strong schema evolution rules and by employing a central schema registry to manage them.
If you enjoyed this article on Confluent Schema Registry and Avro, you might also enjoy:
This blog announces the general availability of Confluent Platform 7.8 and its latest key features: Confluent Platform for Apache Flink® (GA), mTLS Identity for RBAC Authorization, and more.
We covered so much at Current 2024, from the 138 breakout sessions, lightning talks, and meetups on the expo floor to what happened on the main stage. If you heard any snippets or saw quotes from the Day 2 keynote, then you already know what I told the room: We are all data streaming engineers now.