In modern distributed systems, especially those built around an event-driven architecture using Kafka, data evolution is a fact of life. Services need to change, and so do the data structures they produce and consume. The challenge is enabling this evolution without breaking existing consumers. The combination of Apache Avro for serialization and the Schema Registry for governance provides a robust solution to this problem.
This article explores how to use Avro and a Schema Registry in Java to achieve type-safe, efficient, and backward-compatible data serialization.
The Core Problem: Data Contract Management
Imagine two services, Service A (producer) and Service B (consumer), communicating via Kafka.
- V1: Both use
Userschema with fields(id, name). - V2:
Service Aadds a new fieldemail. It starts producing messages with the new schema. - Result:
Service B(still on V1) crashes when it tries to deserialize a V2 message because it encounters an unknown field.
The Schema Registry solves this by providing a central authority to store, version, and enforce compatibility rules for Avro schemas.
The Technology Stack
- Apache Avro: A data serialization system.
- Rich Data Structures: Defines schemas using JSON.
- Compact Binary Format: Results in very small payloads.
- Schema Evolution: Supports backward and forward compatibility through well-defined rules.
- Confluent Schema Registry: A distributed, RESTful service (often used with Kafka) that manages and stores Avro schemas and their versions. Other implementations exist (e.g., AWS Glue Schema Registry, Hortonworks Schema Registry).
How It Works: The Serialization/Deserialization Process
The magic lies in not sending the entire schema with every message.
Serialization (Producer):
- The producer, before sending a message, checks if its local Avro schema is registered.
- The Schema Registry returns a unique Schema ID for that schema (or registers a new one if compatible).
- The producer serializes the data into Avro's binary format, prefixing the byte array with this Schema ID.
- The producer sends the
[Magic Byte][Schema ID][Avro Data]to Kafka.
Deserialization (Consumer):
- The consumer reads the message from Kafka.
- It extracts the Schema ID from the message prefix.
- It queries the Schema Registry with this ID to fetch the corresponding Avro schema (caching it locally for performance).
- Using the fetched schema, it deserializes the binary data into a POJO.
This process ensures the consumer always uses the correct schema to read the data, even if it was produced with a newer or older compatible version.
Code Example: Kafka Producer & Consumer with Avro and Schema Registry
We'll use the Confluent's Kafka Avro Serializer library, which handles the interaction with the Schema Registry transparently.
Step 1: Define the Avro Schema (src/main/avro/user.avsc)
{
"namespace": "com.example.avro",
"type": "record",
"name": "User",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "email",
"type": ["null", "string"],
"default": null
}
]
}
Note: The "default": null makes the email field optional, ensuring backward compatibility.
Step 2: Generate Java Classes
Use the avro-maven-plugin to generate the User Java class at compile time.
<!-- In your pom.xml -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.3</version>
<configuration>
<stringType>String</stringType>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
</execution>
</executions>
</plugin>
Run mvn compile to generate com.example.avro.User.
Step 3: Kafka Producer Code
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class AvroProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); // Use the Avro Serializer
props.put("schema.registry.url", "http://localhost:8081"); // Schema Registry URL
KafkaProducer<String, User> producer = new KafkaProducer<>(props);
String topic = "users";
// Create a User object. The 'email' field can be omitted thanks to the default.
User user = User.newBuilder()
.setId(1)
.setName("Alice")
// .setEmail("[email protected]") // This is optional
.build();
ProducerRecord<String, User> record = new ProducerRecord<>(topic, user.getName().toString(), user);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Message sent successfully to " + metadata.topic());
} else {
exception.printStackTrace();
}
});
producer.flush();
producer.close();
}
}
Step 4: Kafka Consumer Code
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class AvroConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); // Use the Avro Deserializer
props.put("schema.registry.url", "http://localhost:8081");
props.put("specific.avro.reader", "true"); // Crucial: Deserialize to our generated User class
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
String topic = "users";
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, User> record : records) {
User user = record.value();
System.out.printf("Received User: id=%d, name=%s, email=%s%n",
user.getId(),
user.getName(),
user.getEmail() // Will be null if not set by producer
);
}
}
} finally {
consumer.close();
}
}
}
Key Configuration and Concepts
schema.registry.url: Points the (de)serializer to the Schema Registry.specific.avro.reader: Whentrue, the deserializer creates instances of your generated class (e.g.,User). Whenfalse, it returns aGenericRecord.- Compatibility Types: The Schema Registry can be configured with compatibility rules (e.g.,
BACKWARD,FORWARD,FULL) to enforce safe evolution.BACKWARD(the default) allows new consumers to read data produced by old producers.
Schema Evolution in Action
With our initial schema, let's see evolution at work:
- Producer V1: Sends
Userwith(id=1, name="Alice"). Schema Registry registers schema V1. - Consumer V1: Successfully reads the data.
- Producer V2: Updated to also send
email. It registers the new schema. Because we added a field with a default, the change is BACKWARD compatible. The producer starts sending(id=1, name="Alice", email="[email protected]"). - Consumer V1: Still works! It uses its old schema (V1) to read the new data. It ignores the unknown
emailfield and successfully reads theidandname. The system doesn't break.
Advantages of This Approach
- Robustness: Preives breaking changes from propagating and crashing consumers.
- Efficiency: Avro binary format is very compact, and sending only the Schema ID reduces network overhead.
- Centralized Governance: The Schema Registry provides a single source of truth for all data contracts.
- Type Safety: Working with generated Java classes reduces runtime errors.
Best Practices
- Always Set Defaults for New Fields: This is the key to backward compatibility.
- Avoid Removing Fields Without Care: Removing a field is only safe if it's optional (
nullunion) and no consumer depends on it. - Use Meaningful Schema Namespaces: This helps in managing and discovering schemas.
- Monitor Schema Registry: Keep an eye on schema versions and compatibility changes.
Conclusion
The combination of Avro and a Schema Registry is a cornerstone of building resilient, evolvable, and efficient data streams in Java. By moving the schema from the message payload to a centralized registry, it enables seamless schema evolution while maintaining performance. Integrating it with Kafka producers and consumers is straightforward with Confluent's serializers, allowing developers to focus on business logic while the framework handles the complexities of data compatibility.
Further Reading: Explore the Confluent Schema Registry REST API for directly managing schemas, and learn about more advanced compatibility modes like FULL for the most stringent requirements.