1.2.3 …. Avro !-Part 2-Confluent Cloud Schema Registry 101

Diptiman Raichaudhuri
10 min readSep 10, 2024

--

Disclosure: All opinions expressed in this article are my own, and represent no one but myself and not those of my current or any previous employers.

This article is Part 2 of my Apache Avro with Confluent Cloud series.

If you haven’t read Part-1, please go through it and run the python code examples. You would gain an understanding of what an Apache Avro schema is, how is it designed and written, and example python code for reading and writing objects with an Apache Avro schema.

In this article, I would focus on how Avro schemas are stored within Confluent Cloud and how Kafka events are validated using a schema.

An Avro schema is associated with the data/message. This is to ensure that any data validation rule or data quality check has immediate access to the schema when the data/message is validated/introspected.

While in my last example I created a Avro datum(serialized format of the message + schema), it is not performant in terms of the data/message packet size. If the size of the schema definition is long, then the eventual serialized messages size also increase. Not good !

That’s when a nifty trick comes in handy ! Confluent Schema Registry provides a centralized repository for managing and validating schemas for topic message data, and for serialization and deserialization of the data over the network. A schema registry imposes Stream Governance on multiple topic data with many different schemas and their versions.

Confluent brokers use Schema Registry to intelligently transfer Kafka topic message data and events between producers and consumers.

Let’s understand how the Confluent schema registry solves the problem of embedding the Avro schema within a message overhead.

So, if you have read Part 1 of this article, the posevent.avro file actually has both the schema and the message together in the same file, as described in the image below :

Normally, the schema will be larger in size than the message, for real-time data streaming use cases. Confluent Schema Registry solves this problem by registering the schema in schema registry first, and by returning a 4 byte schema id. So, when the producer serializes the message, the schema gets stored in Confluent Schema Registry first. Then the schema registry returns a 4 byte schema id. The producer then packs this schema id(which is much lesser in size now, compared to a full blown json schema !), along with a Magic Byte (which is currently 0 ) which is meant to identify the file content.

Now, the message looks like this:

This essentially solves the problem of a large schema size by registering the schema from the producer, at the same time complying with the Avro grammar of associating the schema with the message. The producer, now, sends the Avro message along with the schema id returned from schema registry, to the broker. The broker validates the schema with Confluent Schema Registry and stores the message. But, what happens at the subscriber/consumer side ? Well, it just reverses the same set of steps which the publisher/producer serializer does. The consumer gets the schema from Confluent Schema registry and validates the message with the schema during deserialization of the message.

So, Confluent Schema Registry plays the important part of externalizing the schema from the publisher/producer, the subscriber/consumer, the topic/brokers and present a REST API to query and get information about the schema.

Here’s a diagram of the overall approach ( For Confleunt Platform) :

Source: https://docs.confluent.io/platform/current/schema-registry/fundamentals/index.html

Now, that the design of schema registry is discussed, let’s get back to a code sample. I will re-use the schema developed for a PoS (Point of Sale) device from a retail store payment counter (Read Part 1 for details).

First, let’s see what happens if no schema is provided and the message json is published directly to the broker.

Let’s download the project structure from Github repo.

Add confluent-kafka python dependencies:

 pip install confluent-kafka

Create a Confluent Cloud account, create a Kafka cluster, a topic, a cluster API key and a schema registry API key. You can follow along with my earlier tutorial of how to set all of this up, end-to-end Confluent Cloud Tutorial.

Assuming, you are complete with all Confluent Cloud setup steps, let’s create a config file with Confluent Cloud credentials, and save it as cc_config.py :

cc_config = {
'bootstrap.servers': '<YOUR_BOOTSTRAP_SERVER>:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '<CLUSTER_API_KEY>',
'sasl.password': '<CLUSTER_API_SECRET>'
}

sr_config = {
'url': '<SCHEMA_REGISTRY_ENDPOINT>',
'basic.auth.user.info': '<SR_API_KEY>:<SR_API_SECRET>'
}

These 2 python dictionaries will help the producer to connect to the Confluent Cloud Cluster and Confluent Schema Registry, respectively.

Let’s create a producer which will publish simulated PoS events to a Kafka topic in Confluent Cloud, with no schema and no key.

Here’s the code sample broken down (full code in Github repo) saved in a file posevent_cc_producer_no_schema.py :

import datetime
import json
import random
import time

from cc_config import cc_config, sr_config

from confluent_kafka import Producer

Imported python classes, the Confluent Cloud config and the Producer.

class POSEvent(object):
def __init__(self, retail_store_id, workstation_id, sequence_no,
business_ts, operator_id, item_id, item_desc,
unit_price, discount, tax):
self.retail_store_id = retail_store_id
self.workstation_id = workstation_id
self.sequence_no = sequence_no
self.business_ts = business_ts
self.operator_id = operator_id
self.item_id = item_id
self.item_desc = item_desc
self.unit_price = unit_price
self.discount = discount
self.tax = tax

def pos_event():
now = datetime.datetime.now()
rounded_now = str(now.replace(microsecond=0))
retail_store_id = random.randint(166, 212)
tills = ['TILL1' ,'TILL2' ,'TILL3' ,'TILL4' ,'TILL5' ,'TILL6', 'TILL7']
workstation_id = random.choice(tills)
sequence_no = random.randint(2, 9)
business_ts = int(time.time() * 1000)
operator_id = random.randint(201, 401)
item_id = random.randint(22300, 22700)
item_descs = ['Razor Men', 'Razor Women', 'Epilator', 'Shaving Cream',
'Bath Gel', 'Moisturiser', 'Face Scrub', 'After Shave',
'Eau De Cologne', 'Milk Clenaser', 'Deodorant', 'Shaving Gel']
item_desc = random.choice(item_descs)
unit_price = round(random.uniform(1.50, 7.80), 2)
discount = round((unit_price * 0.05), 2)
tax = round((unit_price * 0.02), 2)

pos_event = POSEvent(
retail_store_id = retail_store_id,
workstation_id = workstation_id,
sequence_no = sequence_no,
business_ts = business_ts,
operator_id = operator_id,
item_id = item_id,
item_desc = item_desc,
unit_price = unit_price,
discount = discount,
tax = tax
)

return(pos_event)

Invoking pos_event() method will return a fully simulated POSEvent object.

def posevent_to_dict(posevent):
return dict(
retail_store_id=posevent.retail_store_id,
workstation_id=posevent.workstation_id,
sequence_no=posevent.sequence_no,
business_ts=posevent.business_ts,
operator_id=posevent.operator_id,
item_id=posevent.item_id,
item_desc=posevent.item_desc,
unit_price=posevent.unit_price,
discount=posevent.discount,
tax=posevent.tax
)

The method posevent_to_dict() will return a JSON object which could be wire transferred to the Kafka topic.

def delivery_report(err, event):
if err is not None:
print(f"Delivery failed on event for {event.value().decode('utf8')}: {err}")
else:
print(f"Event {event.value().decode('utf8')} produced to {event.topic()}")

A delivery_report() method which prints events on the console.

def main():
topic = "posrecord_noschema"
producer = Producer(cc_config)

try:
while True:
device_data = posevent_to_dict(pos_event())
producer.produce(topic=topic,
value=json.dumps(device_data),
on_delivery=delivery_report)
time.sleep(2)
producer.flush()
except Exception as e:
print(e)
finally:
producer.flush()

if __name__ == '__main__':
main()

Before the main method is run, we need to create the posrecord_noschema topic in Confluent Cloud, with a single partition and default settings for the rest of the parameters:

Of course, no messages show up, since, the producer has not run yet:

Let’s run the producer main() method, and you will see an event gets published every 2 seconds :

And the Confluent Cloud topic also displays events:

Notice, how the key is empty and messages are just wrapped in a json within the value field.

Also, the Schema tab is empty:

So, no type checking is taking place at the broker, and the consumer cannot check the type of the event against any type system/schema. All in all, an undesirable state-of-affairs !

Let’s fix this, by issuing a key for every message, and a type/schema for the message which will be registered with Confluent Schema Registry, thus allowing tight type checking of messages.

Let’s start with defining an Avro schema for the Point-of-Sale messages.

Here’s v1 of the schema, saved in a file posevent.avsc (The same schema that I used in Part 1 of this series):

{
"namespace": "avro123.exmaple",
"name": "POSEvent",
"type": "record",
"fields": [
{
"name": "store_id",
"type": "int"
},
{
"name": "workstation_id",
"type": "int"
},
{
"name": "operator_id",
"type": "string"
},
{
"name": "item_id",
"type": "int"
},
{
"name": "item_desc",
"type": "string"
},
{
"name": "unit_price",
"type": "float",
"logicalType": "decimal",
"precision": "10",
"scale": 2
},
{
"name": "txn_date",
"type": "long",
"logicalType": "timestamp-millis"
}
]
}

This posevent.avsc file is saved inside an avro folder.

So, now the POSEvent record is defined with clear types. As described earlier, this schema json will be registered with Confluent Schema Registry for the first time the producer publishes messages to Confluent Cloud Kafka topics. Once, registered, Confluent Schema Registry will return a schema-id which will be cached by the producer. The broker will validate messages based on the schema-id , with the schema stored in Confluent Schema Registry.

Let’s create a new python producer and save it as posevent_cc_producer_with_schema.py to distinguish it from the no schema one. Much of the complexities is handled/abstracted out by pre-defined serializers form confluent-kafka library !

Since, this would use Avro serialization, the fastavro python avro serializer would be needed, so, install it using pip install fastavro :

pip install fastavro

Let’s study the imports:

import datetime
import random
import time

from cc_config import cc_config, sr_config

from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka import Producer

The StringSerializer is imported to serialize the key , which would be the operator_id , AvroSerializer is imported to serialize the message value, SchemaRegistryClient is imported to register the schema defined in posevent.avsc with Confluent Schema Registry.

class POSEvent(object):
def __init__(self, store_id, workstation_id, operator_id,
item_id, item_desc, unit_price,txn_date):
self.store_id = store_id
self.workstation_id = workstation_id
self.operator_id = operator_id
self.item_id = item_id
self.item_desc = item_desc
self.unit_price = unit_price
self.txn_date = txn_date

The POSEvent class is changed to strictly adhere to the typed fields as per the schema file.

def pos_event():
now = datetime.datetime.now()
rounded_now = str(now.replace(microsecond=0))
store_id = random.randint(166, 212)
tills = ['TILL1' ,'TILL2' ,'TILL3' ,'TILL4' ,'TILL5' ,'TILL6', 'TILL7']
workstation_id = random.randint(10, 18)
operator_id = random.choice(tills)
item_id = random.randint(22300, 22700)
item_descs = ['Razor Men', 'Razor Women', 'Epilator', 'Shaving Cream',
'Bath Gel', 'Moisturiser', 'Face Scrub', 'After Shave',
'Eau De Cologne', 'Milk Clenaser', 'Deodorant', 'Shaving Gel']
item_desc = random.choice(item_descs)
unit_price = round(random.uniform(1.50, 7.80), 2)
txn_date = int(time.time() * 1000)

pos_event = POSEvent(
store_id = store_id,
workstation_id = workstation_id,
operator_id = operator_id,
item_id = item_id,
item_desc = item_desc,
unit_price = unit_price,
txn_date = txn_date
)
return(pos_event)

The pos_event() method now simulates fields as per the requirement from the Avro schema.

def delivery_report(err, event):
if err is not None:
print(f"Delivery failed on event for {event.key().decode('utf8')}: {err}")
else:
print(f"PoS Event {event.key().decode('utf8')} produced to {event.topic()}")

def posevent_to_dict(posevent, ctx):
return dict(
store_id=posevent.store_id,
workstation_id=posevent.workstation_id,
operator_id=posevent.operator_id,
item_id=posevent.item_id,
item_desc=posevent.item_desc,
unit_price=posevent.unit_price,
txn_date=posevent.txn_date
)

A delivery_report() method to print delivery results on the console and the posevent_to_dict() method to convert the POSEvent object to a wire transferable JSON dict format.

def main():
topic = "posrecord"
schema = "avro/posevent.avsc"

with open(schema) as f:
schema_str = f.read()
producer = Producer(cc_config)
string_serializer = StringSerializer('utf_8')
schema_registry_client = SchemaRegistryClient(sr_config)
avro_serializer = AvroSerializer(schema_registry_client,
schema_str,
posevent_to_dict)
try:
while True:
producer.produce(topic=topic,
key=string_serializer(pos_event().operator_id),
value=avro_serializer(pos_event(), SerializationContext(topic, MessageField.VALUE)),
on_delivery=delivery_report)
time.sleep(2)
producer.flush()
except Exception as e:
print(e)
finally:
producer.flush()

if __name__ == '__main__':
main()

The main() does a couple of things:

  1. Reads the posevent.avsc schema file
  2. Creates an avro_serializer reference which is integrated with Confluent Schema Registry
  3. The operator_id is used as the key for the message and is serialized using a StringSerializer
  4. The pos_event message is serialized using the Avroserializer
  5. Every 2 seconds, an Avro message is published to the posrecord topic in Confluent Cloud, which complies to the schema created posevent.avsc .

The full code is in GitHub repo.

When this Avro producer is run, the key as the operator_id is printed on the console (The previous example, did not have a key, while in this case, we have a valid key, have a look at the new delivery_report() method ):

Looking at messages from Confluent Cloud Console:

Notice, how the key is now populated with the operator_id.

Also, the schema tab shows the registered schema:

Notice, the Schema ID is also displayed, which is a numerical entity !

This example now has the schema registered at Confluent Schema Registry, a typed set of messages, easy to validate (In another article, I will explain how Data Quality rules could be imposed on messages !).

This example ends here, in the next article, I will demonstrate, how a strongly typed message is transformed using Flink SQL on Confluent Cloud Data Streaming Platform !

Github repo.

Stay tuned and happy coding !

--

--