Introduction to Apache Avro


Bhaskar S 12/13/2013


Overview

Apache Avro is a Data Serialization framework with the following features:

There are two approaches to serializing and deserializing data using Apache Avro:

We will use the second approach (without code generation) to demostrate serialzation and deserialization using the Java language.

Apache Avro and its Dependencies

Download Apache Avro from the following site:

Apache Avro has a dependency on the following additional open-source Java framework:

Hands-on with Apache Avro

We will demonstrate the ability to both serialize and deserialize using Avro with a simple Contact schema.

The following is the schema definition for a language independent Contact object:

Listing.1
{
"type": "record",
"namespace": "com.polarsparc.avro",
"name": "Contact",
"fields": [
{"name": "First", "type": "string"},
{"name": "Last", "type": "string"},
{"name": "Email", "type": "string"},
{"name": "Work", "type": ["string", "null"]},
{"name": "Home", "type": ["string", "null"]},
{"name": "Mobile", "type": ["string", "null"]}
]
}

Let us understand the Avro schema shown in Listing.1 above:

The following is the Java test class Contact that demonstrates the ability to serialize and deserialize (both binary and JSON) of the schema defined in Listing.1 without code generation using the Avro API:

Listing.2
package com.polarsparc.avro;

import java.io.File;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

import org.apache.avro.Schema;
import org.apache.avro.util.Utf8;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericDatumReader;

public class Contact {
public static void main(String[] args) {
try {
// Create a schema

Schema schema = new Schema.Parser().parse(new File("resources/Contact.avsc"));

// Create a record from the schema

GenericRecord srec = new GenericData.Record(schema);
srec.put("First", new Utf8("John"));
srec.put("Last", new Utf8("Doe"));
srec.put("Email", new Utf8("john.doe@space.com"));
srec.put("Mobile", new Utf8("123-456-7890"));

// Create a writer to serialize the record

DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);

// Demonstrate Binary serialization

ByteArrayOutputStream stream = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(stream, null);
writer.write(srec, encoder);
encoder.flush();
stream.close();

// Create raw bytes of the Binary serialized record

byte[] raw1 = stream.toByteArray();

System.out.println("=> Serialized binary data size: " + raw1.length);

// Demonstrate JSON serialization

ByteArrayOutputStream baos = new ByteArrayOutputStream();
JsonEncoder jenc = EncoderFactory.get().jsonEncoder(schema, baos);
writer.write(srec, jenc);
jenc.flush();
baos.close();

System.out.println("=> Serialized json: " + baos);
System.out.println("=> Serialized json data size: " + baos.toByteArray().length);

// Create raw bytes of the JSON serialized record

byte[] raw2 = baos.toByteArray();

// Create a reader to deserialize the raw bytes

DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

// Demonstrate Binary deserialization

BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(raw1, null);
GenericRecord brec = reader.read(null, decoder);

System.out.printf("=> Binary deserialized record: %s %s - %s\n",
brec.get("First"), brec.get("Last"), brec.get("Email"));

// Demonstrate JSON deserialization

ByteArrayInputStream bais = new ByteArrayInputStream(raw2);
JsonDecoder jdec = DecoderFactory.get().jsonDecoder(schema, bais);
GenericRecord jrec = reader.read(null, jdec);

System.out.printf("=> Json deserialized record: %s %s - %s\n", jrec.get("First"),
jrec.get("Last"), jrec.get("Mobile"));
}
catch (Exception ex) {
ex.printStackTrace(System.out);
}
}
}

Compiling and executing the test code from Listing.2 produces the following output:

Output.1

=> Serialized binary data size: 44
=> Serialized json: {"First":"John","Last":"Doe","Email":"john.doe@space.com","Work":null,"Home":null,"Mobile":{"string":"123-456-7890"}}
=> Serialized json data size: 117
=> Binary deserialized record: John Doe - john.doe@space.com
=> Json deserialized record: John Doe - 123-456-7890

We have sucessfully demonstrated our first Avro example.

Next, we will demonstrate the ability to both serialize and deserialize a schema that contains nested records. We will use a simple Customer schema in this example.

The following is the schema definition for a language independent Customer object:

Listing.3
{
"type": "record",
"namespace": "com.polarsparc.avro",
"name": "Customer",
"fields": [
{"name": "First", "type": "string"},
{"name": "Last", "type": "string"},
{"name": "Contacts", "type": {
"type" : "record",
"name" : "Contacts",
"fields" : [
{"name": "Email", "type": "string"},
{"name": "Work", "type": ["string", "null"]},
{"name": "Home", "type": ["string", "null"]},
{"name": "Mobile", "type": ["string", "null"]}
]
}
}
]
}

Notice the type definition for the field named Contacts; it is an inner record.

The following is the Java test class Customer that demonstrates the ability to serialize and deserialize (both binary and JSON) of the schema defined in Listing.3 without code generation using the Avro API:

Listing.4
package com.polarsparc.avro;

import java.io.File;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

import org.apache.avro.Schema;
import org.apache.avro.util.Utf8;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericDatumReader;

public class Customer {
public static void main(String[] args) {
try {
// Create a schema

Schema schema = new Schema.Parser().parse(new File("resources/Customer.avsc"));

// Create a Customer record from the schema

GenericRecord srec = new GenericData.Record(schema);
srec.put("First", new Utf8("John"));
srec.put("Last", new Utf8("Doe"));

// Create the inner Contacts record from the schema

GenericRecord ssub = new GenericData.Record(schema.getField("Contacts").schema());
ssub.put("Email", new Utf8("john.doe@space.com"));
ssub.put("Mobile", new Utf8("123-456-7890"));

// Set the reference to Contacts record in the Customer record

srec.put("Contacts", ssub);

// Create a writer to serialize the record

DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);

// Demonstrate Binary serialization

ByteArrayOutputStream stream = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(stream, null);
writer.write(srec, encoder);
encoder.flush();
stream.close();

// Create raw bytes of the Binary serialized record

byte[] raw1 = stream.toByteArray();

System.out.println("=> Serialized binary data size: " + raw1.length);

// Demonstrate JSON serialization

ByteArrayOutputStream baos = new ByteArrayOutputStream();
JsonEncoder jenc = EncoderFactory.get().jsonEncoder(schema, baos);
writer.write(srec, jenc);
jenc.flush();
baos.close();

System.out.println("=> Serialized json: " + baos);
System.out.println("=> Serialized json data size: " + baos.toByteArray().length);

// Create raw bytes of the JSON serialized record

byte[] raw2 = baos.toByteArray();

// Create a reader to deserialize the raw bytes

DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

// Demonstrate Binary deserialization

BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(raw1, null);
GenericRecord brec = reader.read(null, decoder);

System.out.printf("=> Binary deserialized record: %s %s - %s\n", brec.get("First"),
brec.get("Last"), ((GenericRecord)brec.get("Contacts")).get("Email"));

// Demonstrate JSON deserialization

ByteArrayInputStream bais = new ByteArrayInputStream(raw2);
JsonDecoder jdec = DecoderFactory.get().jsonDecoder(schema, bais);
GenericRecord jrec = reader.read(null, jdec);

System.out.printf("=> Json deserialized record: %s %s - %s\n", jrec.get("First"),
jrec.get("Last"), ((GenericRecord)jrec.get("Contacts")).get("Mobile"));
}
catch (Exception ex) {
ex.printStackTrace(System.out);
}
}
}

Compiling and executing the test code from Listing.4 produces the following output:

Output.2

=> Serialized binary data size: 44
=> Serialized json: {"First":"John","Last":"Doe","Contacts":{"Email":"john.doe@space.com","Work":null,"Home":null,"Mobile":{"string":"123-456-7890"}}}
=> Serialized json data size: 130
=> Binary deserialized record: John Doe - john.doe@space.com
=> Json deserialized record: John Doe - 123-456-7890

We have sucessfully demonstrated our second Avro example with nested inner record.

Finally, we will demonstrate schema evolution where the serializer may use version 1 of schema and the deserializer may use an updated to version 2 of the schema. We will use the Customer schema from the earlier example.

Avro supports schema evolution by using a separate schemas for the serializer and the deserialzer.

For our example, the serializer will use the schema from Listing.3.

The following is the updated schema definition for the Customer object:

Listing.5
{
"type": "record",
"namespace": "com.polarsparc.avro",
"name": "Customer",
"fields": [
{"name": "First", "type": "string"},
{"name": "Last", "type": "string"},
{"name": "Contacts", "type": {
"type" : "record",
"name" : "Contacts",
"fields" : [
{"name": "Email", "type": "string"},
{"name": "Primary", "type": ["string", "null"], "aliases": ["Mobile"]},
{"name": "Secondary", "type": ["string", "null"], "aliases": ["Home"]},
{"name": "Zipcode", "type": "string", "default": ""}
]
}
}
]
}

The serializer will use the schema from Listing.3 while the deserialzer will use the schema from Listing.5.

A schema can evolve as a result of:

Comparing Listing.3 with Listing.5, it is clear that the changes are in the inner record Contacts. The field Zipcode has been added (new field). Since the serialized data will not have this field, what value would the deserialized record have ? Hence the need to specify the default attribute with a value of empty string.

The field Mobile has been renamed to Primary. To indicate this change in the new schema, we specify the aliases attribute.

Similarly, the field Home has been renamed to Secondary.

The following is the Java test class SchemaChange that demonstrates the ability to serialize using the schema in Listing.3 deserialize using the schema in Listing.5 without code generation using the Avro API:

Listing.6
package com.polarsparc.avro;

import java.io.File;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

import org.apache.avro.Schema;
import org.apache.avro.util.Utf8;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericDatumReader;

public class SchemaChange {
public static void main(String[] args) {
try {
// Create a schema1 for the serializer

Schema schema1 = new Schema.Parser().parse(new File("resources/Customer.avsc"));

// Create a schema2 for the deserializer

Schema schema2 = new Schema.Parser().parse(new File("resources/Customer-V2.avsc"));

// Create a Customer record from the serializer schema1

GenericRecord srec = new GenericData.Record(schema1);
srec.put("First", new Utf8("John"));
srec.put("Last", new Utf8("Doe"));
GenericRecord ssub = new GenericData.Record(schema1.getField("Contacts").schema());
ssub.put("Email", new Utf8("john.doe@space.com"));
ssub.put("Mobile", new Utf8("123-456-7890"));
srec.put("Contacts", ssub);

// Create a writer to serialize the record

DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema1);

// Use JSON serialization using schema1

ByteArrayOutputStream baos1 = new ByteArrayOutputStream();
JsonEncoder jenc1 = EncoderFactory.get().jsonEncoder(schema1, baos1);
writer.write(srec, jenc1);
jenc1.flush();
baos1.close();

System.out.println("=> Serialized json (V1): " + baos1);
System.out.println("=> Serialized json (V1) data size: " + baos1.toByteArray().length);

// Create a reader to deserialize the serialized bytes

DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema1, schema2);

// Demonstrate deserialization using schema2

ByteArrayInputStream bais = new ByteArrayInputStream(baos1.toByteArray());
JsonDecoder jdec = DecoderFactory.get().jsonDecoder(schema1, bais);
GenericRecord jrec = reader.read(null, jdec);

System.out.printf("=> Json (V2) deserialized record: %s %s - %s, Zip: %s\n", jrec.get("First"),
jrec.get("Last"), ((GenericRecord)jrec.get("Contacts")).get("Primary"),
((GenericRecord)jrec.get("Contacts")).get("Zipcode"));
}
catch (Exception ex) {
ex.printStackTrace(System.out);
}
}
}

Compiling and executing the test code from Listing.6 produces the following output:

Output.3

=> Serialized json (V1): {"First":"John","Last":"Doe","Contacts":{"Email":"john.doe@space.com","Work":null,"Home":null,"Mobile":{"string":"123-456-7890"}}}
=> Serialized json (V1) data size: 130
=> Json (V2) deserialized record: John Doe - 123-456-7890, Zip:

We have sucessfully demonstrated our final Avro example on schema evolution.