1.2.3 …. Avro !-Part 1-Apache Avro Basics

Diptiman Raichaudhuri
7 min readJul 22, 2024

--

Disclosure: All opinions expressed in this article are my own, and represent no one but myself and not those of my current or any previous employers.

In this series of articles, let’s learn the basics of writing an Apache Avro schema and encode using python Avro library. Once, we get a hang of how this is done, we’ll take this to the next level by wiring up Avro schemas to define the type of a Kafka event and use schema aware Python Kafka producers emitting events to Confluent Cloud Kafka topics and storing durable copies of such schema in Confluent Cloud Schema Registry.

This is part 1 of this series and in this article, will focus on the basics of the Avro specification and the python library to encode and decode Avro records.

In Part 2 we’ll see how to configure a Confluent Scheme Registry on Confluent Cloud and connect a python Kafka producer with Confluent Cloud with the schema stored in Confluent Schema Registry.

In Part 3, we’ll focus on how a schema aware Kafka event is transformed with Flink SQL on Confluent Cloud.

Let’s start and understand what Avro is !

From Avro’s website , we understand that Apache Avro is a data serialization system with rich data structures (arrays, maps, enums etc…) and with an ability to serialize data running within applications/programs to a compact binary format. Essentially, by serialization we would loosely mean converting the in-memory representation of a structure/object to a binary file, which can be easily transferred over a network.

Apart from serialization-deserialization capabilities, Avro also has many language bindings with Java, Python, C++ etc ..

Most importantly, Avro serialization-deserialization relies on schemas . An Avro schema is defined in JSON .

Avro Datum (we’ll come across this term many times, later), is the raw binary format of data that is serialized and ready to be transported over networks. Avro decouples the binary data from its schema. So, binary encoded Avro records do not contain type of field information, but, imposes the condition that whenever this binary data would be read, the schema needs to be consulted and any data record field and the field type if found missing / mismatching, would raise an error.

So, essentially, we create an Avro schema first, which defines the record fields and record field types. This schema file is used to create a data record which conforms to this schema and then the data record is serialized to an Avro datum (raw, binary format) to be transferred over the network.

With Avro, data records (serialized) and the schema is decoupled and stored separately. Since, the record cannot be read without the schema, there are clever implementations of how to embed the schema within the record (despite a JSON file’s size )and store it (more on it later !).

All this, might sound surprisingly similar to other serialization systems such as Protobuf or Thrift, but, Avro is fundamentally different.

With Avro, since, schema is always present, considerably less type information need be encoded with data, resulting in smaller serialization size. And a bigger difference is, code generation is not mandatory (because, a schema always accompanies data).

As far as the Avro type-system is considered, it supports 8 primitive types (null, boolean, int, long, float, double, bytes and string) and 6 complex types (records, enums, arrays, maps, unions and fixed).

I encourage you to read the Avro specification, if needed, here. The current version is 1.11.1

Armed with this basic knowledge of Avro types, Avro schemas, let’s write code and see how the Avro serialization works in practice.

Let’s run the following steps:

  1. Step-1: Create a Point-of-Sale(POS) event Avro schema
  2. Step-2: Serialize a data record to a binary file(Avro record).
  3. Step-3: Use the Avro schema to deserialize the record and read it.

Step-1

I’ll use pycharm as my IDE and will create a venv project and run this example. All code will be kept at the Github repo.

I created a project 123avro with the following pip install s:

pip install avro-python3

This installs the avro-python3 library.

I also created a folder called avro within the main project:

pycharm IDE for Avro proejct

Let’s create the Avro schema file as given below for PoS event records.

I save this file in avro folder, in my project as posevent.avsc :

{
"namespace": "avro123.exmaple",
"name": "POSEvent",
"type": "record",
"fields": [
{
"name": "store_id",
"type": "int"
},
{
"name": "workstation_id",
"type": "int"
},
{
"name": "operator_id",
"type": "string"
},
{
"name": "item_id",
"type": "int"
},
{
"name": "item_desc",
"type": "string"
},
{
"name": "unit_price",
"type": "float",
"logicalType": "decimal",
"precision": "10",
"scale": 2
},
{
"name": "txn_date",
"type": "long",
"logicalType": "timestamp-millis"
}
]
}

The Avo schema has a record type (which is by far the most used complex types !) and is written in JSON. The schema also has nifty tricks like a logicalType which is Avro’s way of creating a derived type ! A logicalType is serialized using it’s underlying Avro primitive type, so that the values are encoded consistently, but, language implementations might choose to represent logicalType s with an appropriate native type.

The Avro schema as defined above, presents a type system for domain objects interacting over the wire. Any domain object could be type-checked by consulting the schema, at runtime.

Step-2:

Now, that we have the posevent.avsc schema file stored in the avro folder. Let’s serialize a domain object (in this case, a Point-of-Sale event) to it’s binary form using the Avro python package.

Let’s name this file, posevent_avro_writer.py :

import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter

def main():

try:
schemafile = "avro/posevent.avsc"
schema = avro.schema.parse(open(schemafile, "rb").read())
except (FileNotFoundError, OSError):
print(f"Schema file {schemafile} not found")

try:
avrobinfile = "posevent.avro"
writer = DataFileWriter(open(avrobinfile, "wb"), DatumWriter(), schema)
writer.append({
"store_id": 1234,
"workstation_id": 677,
"operator_id": "AB4567",
"item_id": 2345,
"item_desc": "Pelican basmati rice, 500 gms",
"unit_price": 24.50,
"txn_date": 1721397292000
})
writer.close()
except OSError:
print(f"Avro binary file {avrobinfile} could not be written")

if __name__ == '__main__':
main()

Let’s analyze the code. the schema variable stores the parsed schema file, which I created earlier.

The DataFileWriter() uses the DatumWriter() to create a binary posevent.avro file. Then a single PoS event is appended to the Avro file and the file is saved.

Run the program, and it should create a posevent.avro file in the project directory.

Let’s also test, how the schema controls type-safety while serializing the POSEvent object to the posevent.avro file.

Let’s intentionally, change the item_id value to "2345" from 2345 , making int a string and run the program.

You would get the following error:

raise AvroTypeException(self.writer_schema, datum)
avro.io.AvroTypeException: The datum
{'store_id': 1234,
'workstation_id': 677,
'operator_id': 'AB4567',
'item_id': '2345',
'item_desc': 'Pelican basmati rice, 500 gms',
'unit_price': 24.5,
'txn_date': 1721397292000} is not an example of the schema {
"type": "record",
"name": "POSEvent",
"namespace": "avro123.exmaple",
"fields": [
{
"type": "int",
"name": "store_id"
},
{
"type": "int",
"name": "workstation_id"
},
{
"type": "string",
"name": "operator_id"
},
{
"type": "int",
"name": "item_id"
},
{
"type": "string",
"name": "item_desc"
},
{
"logicalType": "decimal",
"precision": "10",
"scale": 2,
"type": "float",
"name": "unit_price"
},
{
"logicalType": "timestamp-millis",
"type": "long",
"name": "txn_date"
}
]
}

Perfect ! The Avro schema protects type-safety by raising the right exceptions, since, the item_id was supposed to be an int and not a string.

Now, that we have a serialized file for the PoS event( posevent.avro ), let’s move to step 3 of reading it.

Step-3:

As explained earlier, the posevent.avro binary file has the schema embedded in it, so, for reading the file, the schema is not needed to be read again. Here’s the code for posevent_avro_reader.py :

from avro.datafile import DataFileReader
from avro.io import DatumReader

def main():
try:
avrobinfile = "posevent.avro"
reader = DataFileReader(open(avrobinfile, "rb"), DatumReader())
for sample in reader:
print(sample)

reader.close()
except FileNotFoundError:
print(f"{avrobinfile} not found.")

if __name__ == '__main__':
main()

And I get the event printed on the console:

{
'store_id': 1234,
'workstation_id': 677,
'operator_id': 'AB4567',
'item_id': 2345,
'item_desc': 'Pelican basmati rice, 500 gms',
'unit_price': 24.5,
'txn_date': 1721397292000
}

Let’s use the Avro viewer from Jetbrains marketplace created by Ben Watson. It can be installed from pycharm console as well, click plugins from settings and search for Avro. Install the viewer.

Now, if you open the file posevent.avro from the bottom pane, you would see this:

Avro schema viewer
Avro data viewer

See, how the file has both the schema + data both contained within the file.

This comes in handy for serialization and type-safety, to both come together in the serialized file itself. In Part-2, I’ll demonstrate how Confluent Schema Registry, stores the schema file in a managed store and uses only a schema-id which gets embedded in the file, making the file size even smaller, enabling faster wire transport.

Github repo.

Stay tuned, happy coding !

--

--