Streaming Database Changes in Real-Time: A Practical Guide to Debezium for CDC in Java

Article

In modern application architecture, data is no longer siloed within a single database. It needs to be streamed to caches, search indexes, data warehouses, and other microservices. Change Data Capture (CDC) is the pattern of identifying and tracking changes in a database. Debezium is a powerful, open-source distributed platform that turns your existing databases into event streams by leveraging CDC, making it an essential tool for building responsive, data-driven Java applications.

This article will explain what Debezium is, how it works under the hood, and how you can use it in your Java ecosystem to react to database changes in real-time.


What is Debezium?

Debezium is a CDC platform built on top of Apache Kafka. It works by reading database transaction logs (the write-ahead log, binlog, etc.), capturing row-level changes (inserts, updates, deletes), and converting them into event streams that are published to Apache Kafka.

Key Characteristics:

  • Non-Intrusive: It reads the database's log files, avoiding polling and putting no additional load on the source database.
  • Real-Time: Changes are captured and emitted as they happen, with sub-second latency.
  • Durable & Reliable: By using Kafka, events are stored durably and can be replayed.
  • Database-Agnostic: Supports PostgreSQL, MySQL, MongoDB, SQL Server, Oracle, and more via connectors.

Core Architecture: How Debezium Works

  1. Debezium Connector: A JVM-based service (built on Kafka Connect) that is configured to monitor a specific database.
  2. Database Transaction Log: The connector reads the database's native log (e.g., MySQL's binlog, PostgreSQL's WAL).
  3. Kafka Connect Framework: Debezium runs as a source connector within the Kafka Connect framework, which handles scalability, fault tolerance, and RESTful management.
  4. Apache Kafka: The connector streams change events to Kafka topics.
  5. Consuming Applications: Any application (including Java services) can consume these events from Kafka topics.

Use Cases for Debezium in Java Applications

  • CQRS/Event Sourcing Read Model Updates: Update Elasticsearch or Redis indices whenever the source database changes.
  • Microservices Data Replication: Share data from a "system of record" database with other microservices without direct database access.
  • Audit Logging: Capture every change for compliance and auditing purposes.
  • Cache Invalidation: Invalidate or update distributed caches (like Redis) in real-time when underlying data changes.
  • Data Warehouse ETL: Stream changes to a data lake or warehouse like Snowflake or BigQuery.

Setting Up Debezium: A Step-by-Step Guide

Let's set up Debezium to capture changes from a MySQL database.

1. Prerequisites

  • A running Apache Kafka cluster (including Zookeeper).
  • A MySQL database with binary logging enabled.

2. Start the Kafka Connect Distributed Worker

You can use a Docker image to run the Kafka Connect worker with the Debezium MySQL connector plugin.

# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
# ... configuration
kafka:
image: confluentinc/cp-kafka:latest
depends_on: [zookeeper]
# ... configuration
kafka-connect:
image: debezium/connect:2.3
depends_on: [kafka]
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
ports:
- "8083:8083"

3. Register the Debezium MySQL Connector

Use the Kafka Connect REST API to register a connector that monitors a specific database.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/ -d @- <<'EOF'
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"table.include.list": "inventory.customers,inventory.orders",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory",
"include.schema.changes": "false"
}
}
EOF

Consuming Debezium Events in a Java Application

Once the connector is running, it will publish events to Kafka topics. A Java application can consume these events using the Kafka Consumer API or a higher-level library like Spring Kafka.

1. Maven Dependencies

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

2. The Debezium Event Structure

A typical Debezium event for an update looks like this:

{
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "[email protected]"
},
"after": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "[email protected]"
},
"source": {
"version": "2.3.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1700000000000,
"snapshot": "false",
"db": "inventory",
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "binlog.000003",
"pos": 364,
"row": 0,
"thread": 7,
"query": null
},
"op": "u",
"ts_ms": 1700000001000
}

3. Java Consumer with Spring Kafka

Create a Spring component to listen to the Kafka topic and process the changes.

@Service
public class CustomerCDCService {
private static final Logger log = LoggerFactory.getLogger(CustomerCDCService.class);
@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate; // Example: update search index
@KafkaListener(topics = "dbserver1.inventory.customers")
public void consumeCustomerChange(ConsumerRecord<String, String> record) {
try {
JsonNode rootNode = JsonUtils.getObjectMapper().readTree(record.value());
// Get the operation type: 'c'=create, 'u'=update, 'd'=delete
String op = rootNode.get("op").asText();
JsonNode after = rootNode.get("after");
JsonNode before = rootNode.get("before");
switch (op) {
case "c": // Create
handleCustomerCreated(after);
break;
case "u": // Update
handleCustomerUpdated(before, after);
break;
case "d": // Delete
handleCustomerDeleted(before);
break;
default:
log.warn("Unknown operation type: {}", op);
}
} catch (Exception e) {
log.error("Error processing CDC event for customer", e);
// Implement retry or dead-letter queue logic
}
}
private void handleCustomerCreated(JsonNode customerData) {
CustomerDocument customerDoc = mapToCustomerDocument(customerData);
elasticsearchTemplate.save(customerDoc);
log.info("Created customer in search index: {}", customerDoc.getEmail());
// Could also: update cache, send notification, etc.
}
private void handleCustomerUpdated(JsonNode before, JsonNode after) {
CustomerDocument updatedDoc = mapToCustomerDocument(after);
elasticsearchTemplate.save(updatedDoc);
log.info("Updated customer in search index: {}", updatedDoc.getEmail());
// Example: Check if email changed and send notification
String oldEmail = before.get("email").asText();
String newEmail = after.get("email").asText();
if (!oldEmail.equals(newEmail)) {
// sendEmailChangeNotification(oldEmail, newEmail);
}
}
private void handleCustomerDeleted(JsonNode customerData) {
Long customerId = customerData.get("id").asLong();
elasticsearchTemplate.delete(String.valueOf(customerId), CustomerDocument.class);
log.info("Deleted customer from search index with ID: {}", customerId);
}
private CustomerDocument mapToCustomerDocument(JsonNode data) {
CustomerDocument doc = new CustomerDocument();
doc.setId(data.get("id").asLong());
doc.setFirstName(data.get("first_name").asText());
doc.setLastName(data.get("last_name").asText());
doc.setEmail(data.get("email").asText());
return doc;
}
}

Configuration in application.yml:

spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: customer-cdc-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: MANUAL_IMMEDIATE

Advanced Considerations

  1. Single Message Transforms (SMTs): Debezium supports lightweight in-connector transformations. For example, you can extract the after state, rename fields, or mask sensitive data.
  2. Outbox Pattern: For reliable event publishing from microservices, Debezium supports the Outbox Pattern, where services write events to an outbox table, which Debezium then captures and publishes.
  3. Error Handling: Implement robust error handling with dead-letter topics and retry mechanisms.
  4. Schema Evolution: Use Avro serialization with Confluent Schema Registry to handle evolving event schemas safely.
  5. Monitoring: Monitor connector metrics (lag, throughput) via JMX or the Kafka Connect REST API.

Benefits and Challenges

Benefits:

  • Decoupling: Services don't need to know about each other; they react to database changes indirectly.
  • Performance: No polling overhead; real-time reaction to changes.
  • Reliability: Built on Kafka's durable, ordered log.
  • Historical Replay: Can reprocess events from the beginning.

Challenges:

  • Operational Complexity: Requires running and monitoring Kafka and Debezium.
  • Eventual Consistency: Consumers are typically eventually consistent with the source database.
  • Schema Management: Need to handle schema changes carefully.

Conclusion

Debezium brings the power of real-time change data capture to the Java ecosystem in a robust, scalable way. By turning database changes into a stream of events, it enables architectures that are more loosely coupled, resilient, and responsive. While it introduces new operational components, the benefits for building modern, event-driven applications—from microservices to data pipelines—are substantial. For Java developers and architects looking to break down data silos and build systems that react instantly to data changes, Debezium is an indispensable tool in their arsenal.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper