Learn how to build robust Change Data Capture pipelines in Java for real-time data streaming, ETL processes, and event-driven architectures.
Table of Contents
- CDC Architecture Overview
- Debezium Integration
- Custom CDC Implementation
- Event Processing & Transformation
- Sink Connectors
- Monitoring & Observability
- Fault Tolerance & Recovery
- Spring Boot Integration
CDC Architecture Overview
What is Change Data Capture?
CDC captures database changes in real-time and streams them to downstream systems for:
- Real-time analytics
- Data replication
- Event-driven architectures
- Cache invalidation
- Audit logging
Pipeline Architecture:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Database │───▶│ CDC │───▶│ Event │───▶│ Sink │ │ Source │ │ Connector │ │ Processor │ │ Systems │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ ▼ ▼ ▼ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Change │ │ Transform │ │ Kafka, │ │ Log │ │ & Enrich │ │ S3, ES │ └─────────────┘ └─────────────┘ └─────────────┘
Debezium Integration
1. Debezium Connector Configuration
// Debezium configuration and event models
@Component
public class DebeziumConnectorManager {
private static final Logger logger = LoggerFactory.getLogger(DebeziumConnectorManager.class);
private final KafkaConnectClient connectClient;
private final ObjectMapper objectMapper;
public DebeziumConnectorManager(KafkaConnectClient connectClient, ObjectMapper objectMapper) {
this.connectClient = connectClient;
this.objectMapper = objectMapper;
}
/**
* Start MySQL Debezium connector
*/
public void startMySqlConnector(String connectorName, MySqlConfig config) {
Map<String, Object> connectorConfig = Map.of(
"name", connectorName,
"connector.class", "io.debezium.connector.mysql.MySqlConnector",
"database.hostname", config.hostname(),
"database.port", config.port(),
"database.user", config.username(),
"database.password", config.password(),
"database.server.id", config.serverId(),
"database.server.name", config.serverName(),
"database.include.list", config.databaseIncludeList(),
"table.include.list", config.tableIncludeList(),
"database.history.kafka.bootstrap.servers", config.bootstrapServers(),
"database.history.kafka.topic", config.historyTopic(),
"include.schema.changes", config.includeSchemaChanges(),
"transforms", "unwrap",
"transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones", "false",
"transforms.unwrap.delete.handling.mode", "rewrite"
);
try {
connectClient.createConnector(connectorName, connectorConfig);
logger.info("Debezium MySQL connector started: {}", connectorName);
} catch (Exception e) {
logger.error("Failed to start Debezium connector: {}", connectorName, e);
throw new ConnectorStartException("Failed to start connector: " + connectorName, e);
}
}
/**
* Start PostgreSQL Debezium connector
*/
public void startPostgresConnector(String connectorName, PostgresConfig config) {
Map<String, Object> connectorConfig = Map.of(
"name", connectorName,
"connector.class", "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname", config.hostname(),
"database.port", config.port(),
"database.user", config.username(),
"database.password", config.password(),
"database.dbname", config.databaseName(),
"database.server.name", config.serverName(),
"plugin.name", "pgoutput",
"slot.name", config.slotName(),
"publication.name", config.publicationName(),
"table.include.list", config.tableIncludeList(),
"tombstones.on.delete", "false",
"transforms", "unwrap",
"transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones", "false"
);
try {
connectClient.createConnector(connectorName, connectorConfig);
logger.info("Debezium PostgreSQL connector started: {}", connectorName);
} catch (Exception e) {
logger.error("Failed to start Debezium connector: {}", connectorName, e);
throw new ConnectorStartException("Failed to start connector: " + connectorName, e);
}
}
/**
* Stop connector
*/
public void stopConnector(String connectorName) {
try {
connectClient.deleteConnector(connectorName);
logger.info("Debezium connector stopped: {}", connectorName);
} catch (Exception e) {
logger.error("Failed to stop Debezium connector: {}", connectorName, e);
}
}
/**
* Get connector status
*/
public ConnectorStatus getConnectorStatus(String connectorName) {
try {
return connectClient.getConnectorStatus(connectorName);
} catch (Exception e) {
logger.error("Failed to get connector status: {}", connectorName, e);
return ConnectorStatus.UNKNOWN;
}
}
// Configuration records
public record MySqlConfig(
String hostname,
int port,
String username,
String password,
String serverId,
String serverName,
String databaseIncludeList,
String tableIncludeList,
String bootstrapServers,
String historyTopic,
boolean includeSchemaChanges
) {}
public record PostgresConfig(
String hostname,
int port,
String username,
String password,
String databaseName,
String serverName,
String slotName,
String publicationName,
String tableIncludeList
) {}
public enum ConnectorStatus {
RUNNING, FAILED, PAUSED, UNKNOWN
}
}
2. Debezium Event Models
// Debezium change event structure
public class DebeziumChangeEvent {
private Source source;
private Object before;
private Object after;
private String op;
private Long ts_ms;
private String transaction;
// Getters and setters
public Source getSource() { return source; }
public void setSource(Source source) { this.source = source; }
public Object getBefore() { return before; }
public void setBefore(Object before) { this.before = before; }
public Object getAfter() { return after; }
public void setAfter(Object after) { this.after = after; }
public String getOp() { return op; }
public void setOp(String op) { this.op = op; }
public Long getTs_ms() { return ts_ms; }
public void setTs_ms(Long ts_ms) { this.ts_ms = ts_ms; }
public String getTransaction() { return transaction; }
public void setTransaction(String transaction) { this.transaction = transaction; }
public static class Source {
private String version;
private String connector;
private String name;
private Long ts_ms;
private String snapshot;
private String db;
private String schema;
private String table;
private String change_lsn;
private String commit_lsn;
private Long event_serial_no;
// Getters and setters
public String getVersion() { return version; }
public void setVersion(String version) { this.version = version; }
public String getConnector() { return connector; }
public void setConnector(String connector) { this.connector = connector; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public Long getTs_ms() { return ts_ms; }
public void setTs_ms(Long ts_ms) { this.ts_ms = ts_ms; }
public String getSnapshot() { return snapshot; }
public void setSnapshot(String snapshot) { this.snapshot = snapshot; }
public String getDb() { return db; }
public void setDb(String db) { this.db = db; }
public String getSchema() { return schema; }
public void setSchema(String schema) { this.schema = schema; }
public String getTable() { return table; }
public void setTable(String table) { this.table = table; }
public String getChange_lsn() { return change_lsn; }
public void setChange_lsn(String change_lsn) { this.change_lsn = change_lsn; }
public String getCommit_lsn() { return commit_lsn; }
public void setCommit_lsn(String commit_lsn) { this.commit_lsn = commit_lsn; }
public Long getEvent_serial_no() { return event_serial_no; }
public void setEvent_serial_no(Long event_serial_no) { this.event_serial_no = event_serial_no; }
}
public enum Operation {
CREATE("c"),
UPDATE("u"),
DELETE("d"),
READ("r"),
TRUNCATE("t");
private final String code;
Operation(String code) {
this.code = code;
}
public String getCode() {
return code;
}
public static Operation fromCode(String code) {
return Arrays.stream(values())
.filter(op -> op.code.equals(code))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unknown operation code: " + code));
}
}
}
// Domain-specific event models
public class CustomerChangeEvent {
private final String eventId;
private final String operation;
private final Long customerId;
private final String email;
private final String firstName;
private final String lastName;
private final String status;
private final Instant timestamp;
private final Map<String, Object> before;
private final Map<String, Object> after;
public CustomerChangeEvent(DebeziumChangeEvent debeziumEvent) {
this.eventId = UUID.randomUUID().toString();
this.operation = debeziumEvent.getOp();
this.timestamp = Instant.ofEpochMilli(debeziumEvent.getTs_ms());
Map<String, Object> afterMap = convertToMap(debeziumEvent.getAfter());
Map<String, Object> beforeMap = convertToMap(debeziumEvent.getBefore());
this.customerId = extractLong(afterMap != null ? afterMap : beforeMap, "id");
this.email = extractString(afterMap, "email");
this.firstName = extractString(afterMap, "first_name");
this.lastName = extractString(afterMap, "last_name");
this.status = extractString(afterMap, "status");
this.before = beforeMap;
this.after = afterMap;
}
private Map<String, Object> convertToMap(Object obj) {
if (obj instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) obj;
return map;
}
return null;
}
private Long extractLong(Map<String, Object> map, String key) {
if (map != null && map.containsKey(key)) {
Object value = map.get(key);
if (value instanceof Number) {
return ((Number) value).longValue();
} else if (value instanceof String) {
return Long.parseLong((String) value);
}
}
return null;
}
private String extractString(Map<String, Object> map, String key) {
if (map != null && map.containsKey(key)) {
Object value = map.get(key);
return value != null ? value.toString() : null;
}
return null;
}
// Getters
public String getEventId() { return eventId; }
public String getOperation() { return operation; }
public Long getCustomerId() { return customerId; }
public String getEmail() { return email; }
public String getFirstName() { return firstName; }
public String getLastName() { return lastName; }
public String getStatus() { return status; }
public Instant getTimestamp() { return timestamp; }
public Map<String, Object> getBefore() { return before; }
public Map<String, Object> getAfter() { return after; }
public boolean isCreate() { return "c".equals(operation); }
public boolean isUpdate() { return "u".equals(operation); }
public boolean isDelete() { return "d".equals(operation); }
}
Custom CDC Implementation
3. Database Polling CDC
// Custom CDC implementation using database polling
@Component
public class DatabasePollingCDC {
private static final Logger logger = LoggerFactory.getLogger(DatabasePollingCDC.class);
private final DataSource dataSource;
private final ChangeEventProcessor eventProcessor;
private final ScheduledExecutorService scheduler;
private final Map<String, TableWatcher> tableWatchers;
public DatabasePollingCDC(DataSource dataSource, ChangeEventProcessor eventProcessor) {
this.dataSource = dataSource;
this.eventProcessor = eventProcessor;
this.scheduler = Executors.newScheduledThreadPool(5);
this.tableWatchers = new ConcurrentHashMap<>();
}
/**
* Start watching a table for changes
*/
public void watchTable(TableConfig config) {
TableWatcher watcher = new TableWatcher(config);
tableWatchers.put(config.tableName(), watcher);
scheduler.scheduleAtFixedRate(
watcher::pollChanges,
config.initialDelayMs(),
config.pollIntervalMs(),
TimeUnit.MILLISECONDS
);
logger.info("Started watching table: {}", config.tableName());
}
/**
* Stop watching a table
*/
public void stopWatchingTable(String tableName) {
TableWatcher watcher = tableWatchers.remove(tableName);
if (watcher != null) {
watcher.stop();
logger.info("Stopped watching table: {}", tableName);
}
}
/**
* Table watcher implementation
*/
private class TableWatcher {
private final TableConfig config;
private volatile boolean running = true;
private volatile Long lastProcessedId;
public TableWatcher(TableConfig config) {
this.config = config;
initializeLastProcessedId();
}
public void pollChanges() {
if (!running) return;
try {
List<ChangeEvent> changes = fetchChanges();
if (!changes.isEmpty()) {
eventProcessor.processChanges(changes);
updateLastProcessedId(changes);
logger.debug("Processed {} changes for table: {}", changes.size(), config.tableName());
}
} catch (Exception e) {
logger.error("Error polling changes for table: {}", config.tableName(), e);
}
}
public void stop() {
running = false;
}
private void initializeLastProcessedId() {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT COALESCE(MAX(" + config.idColumn() + "), 0) FROM " + config.tableName())) {
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
lastProcessedId = rs.getLong(1);
}
} catch (SQLException e) {
logger.error("Failed to initialize last processed ID for table: {}", config.tableName(), e);
lastProcessedId = 0L;
}
}
private List<ChangeEvent> fetchChanges() {
List<ChangeEvent> changes = new ArrayList<>();
String sql = String.format(
"SELECT * FROM %s WHERE %s > ? AND %s IS NOT NULL ORDER BY %s ASC LIMIT ?",
config.tableName(), config.idColumn(), config.timestampColumn(), config.idColumn()
);
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setLong(1, lastProcessedId);
stmt.setInt(2, config.batchSize());
ResultSet rs = stmt.executeQuery();
ResultSetMetaData metaData = rs.getMetaData();
while (rs.next()) {
ChangeEvent event = mapResultSetToEvent(rs, metaData);
changes.add(event);
}
} catch (SQLException e) {
logger.error("Failed to fetch changes for table: {}", config.tableName(), e);
}
return changes;
}
private ChangeEvent mapResultSetToEvent(ResultSet rs, ResultSetMetaData metaData) throws SQLException {
Map<String, Object> data = new HashMap<>();
int columnCount = metaData.getColumnCount();
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
Object value = rs.getObject(i);
data.put(columnName, value);
}
Long id = rs.getLong(config.idColumn());
Instant timestamp = rs.getTimestamp(config.timestampColumn()).toInstant();
return new ChangeEvent(
UUID.randomUUID().toString(),
config.tableName(),
"INSERT", // Assuming inserts for simplicity
data,
null, // No before state in polling
timestamp
);
}
private void updateLastProcessedId(List<ChangeEvent> changes) {
if (!changes.isEmpty()) {
ChangeEvent lastEvent = changes.get(changes.size() - 1);
Long lastId = (Long) lastEvent.getData().get(config.idColumn());
if (lastId != null && lastId > lastProcessedId) {
lastProcessedId = lastId;
}
}
}
}
// Configuration and event classes
public record TableConfig(
String tableName,
String idColumn,
String timestampColumn,
long pollIntervalMs,
long initialDelayMs,
int batchSize
) {}
public static class ChangeEvent {
private final String eventId;
private final String tableName;
private final String operation;
private final Map<String, Object> data;
private final Map<String, Object> before;
private final Instant timestamp;
public ChangeEvent(String eventId, String tableName, String operation,
Map<String, Object> data, Map<String, Object> before, Instant timestamp) {
this.eventId = eventId;
this.tableName = tableName;
this.operation = operation;
this.data = Map.copyOf(data);
this.before = before != null ? Map.copyOf(before) : null;
this.timestamp = timestamp;
}
// Getters
public String getEventId() { return eventId; }
public String getTableName() { return tableName; }
public String getOperation() { return operation; }
public Map<String, Object> getData() { return data; }
public Map<String, Object> getBefore() { return before; }
public Instant getTimestamp() { return timestamp; }
}
@PreDestroy
public void shutdown() {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
Event Processing & Transformation
4. Change Event Processor
@Component
public class ChangeEventProcessor {
private static final Logger logger = LoggerFactory.getLogger(ChangeEventProcessor.class);
private final Map<String, TableProcessor> tableProcessors;
private final EventTransformer eventTransformer;
private final MetricsService metricsService;
public ChangeEventProcessor(EventTransformer eventTransformer, MetricsService metricsService) {
this.eventTransformer = eventTransformer;
this.metricsService = metricsService;
this.tableProcessors = new ConcurrentHashMap<>();
initializeProcessors();
}
/**
* Process change events from CDC
*/
public void processDebeziumEvent(DebeziumChangeEvent debeziumEvent) {
try {
String tableName = debeziumEvent.getSource().getTable();
TableProcessor processor = tableProcessors.get(tableName);
if (processor != null) {
DomainEvent domainEvent = processor.process(debeziumEvent);
if (domainEvent != null) {
eventTransformer.transformAndRoute(domainEvent);
metricsService.recordEventProcessed(tableName, domainEvent.getEventType());
}
} else {
logger.warn("No processor found for table: {}", tableName);
metricsService.recordUnprocessedEvent(tableName);
}
} catch (Exception e) {
logger.error("Failed to process Debezium event for table: {}",
debeziumEvent.getSource().getTable(), e);
metricsService.recordEventError(debeziumEvent.getSource().getTable());
}
}
/**
* Process batch of change events
*/
public void processChanges(List<DatabasePollingCDC.ChangeEvent> changes) {
for (DatabasePollingCDC.ChangeEvent change : changes) {
try {
TableProcessor processor = tableProcessors.get(change.getTableName());
if (processor != null) {
DomainEvent domainEvent = processor.process(change);
if (domainEvent != null) {
eventTransformer.transformAndRoute(domainEvent);
metricsService.recordEventProcessed(change.getTableName(), domainEvent.getEventType());
}
}
} catch (Exception e) {
logger.error("Failed to process change event for table: {}", change.getTableName(), e);
metricsService.recordEventError(change.getTableName());
}
}
}
private void initializeProcessors() {
tableProcessors.put("customers", new CustomerTableProcessor());
tableProcessors.put("orders", new OrderTableProcessor());
tableProcessors.put("products", new ProductTableProcessor());
tableProcessors.put("order_items", new OrderItemTableProcessor());
}
// Table processor interface
public interface TableProcessor {
DomainEvent process(DebeziumChangeEvent event);
DomainEvent process(DatabasePollingCDC.ChangeEvent event);
}
// Customer table processor
public class CustomerTableProcessor implements TableProcessor {
@Override
public DomainEvent process(DebeziumChangeEvent event) {
CustomerChangeEvent customerEvent = new CustomerChangeEvent(event);
return switch (DebeziumChangeEvent.Operation.fromCode(event.getOp())) {
case CREATE -> new CustomerCreatedEvent(customerEvent);
case UPDATE -> new CustomerUpdatedEvent(customerEvent);
case DELETE -> new CustomerDeletedEvent(customerEvent);
default -> {
logger.debug("Ignoring operation: {} for customer", event.getOp());
yield null;
}
};
}
@Override
public DomainEvent process(DatabasePollingCDC.ChangeEvent event) {
// Convert polling event to domain event
Map<String, Object> data = event.getData();
Long customerId = (Long) data.get("id");
String email = (String) data.get("email");
String status = (String) data.get("status");
return new CustomerCreatedEvent(
UUID.randomUUID().toString(),
customerId,
email,
(String) data.get("first_name"),
(String) data.get("last_name"),
status,
event.getTimestamp()
);
}
}
// Order table processor
public class OrderTableProcessor implements TableProcessor {
@Override
public DomainEvent process(DebeziumChangeEvent event) {
Map<String, Object> after = convertToMap(event.getAfter());
Map<String, Object> before = convertToMap(event.getBefore());
Long orderId = extractLong(after != null ? after : before, "id");
String status = extractString(after, "status");
String previousStatus = extractString(before, "status");
return switch (DebeziumChangeEvent.Operation.fromCode(event.getOp())) {
case CREATE -> new OrderCreatedEvent(orderId, status, event.getTs_ms());
case UPDATE -> {
if (statusChanged(status, previousStatus)) {
yield new OrderStatusChangedEvent(orderId, previousStatus, status, event.getTs_ms());
}
yield null;
}
default -> null;
};
}
@Override
public DomainEvent process(DatabasePollingCDC.ChangeEvent event) {
// Implementation for polling events
return null;
}
private boolean statusChanged(String newStatus, String oldStatus) {
return newStatus != null && !newStatus.equals(oldStatus);
}
}
// Helper methods
private Map<String, Object> convertToMap(Object obj) {
if (obj instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) obj;
return map;
}
return null;
}
private Long extractLong(Map<String, Object> map, String key) {
if (map != null && map.containsKey(key)) {
Object value = map.get(key);
if (value instanceof Number) {
return ((Number) value).longValue();
}
}
return null;
}
private String extractString(Map<String, Object> map, String key) {
if (map != null && map.containsKey(key)) {
Object value = map.get(key);
return value != null ? value.toString() : null;
}
return null;
}
}
// Domain events
public abstract class DomainEvent {
private final String eventId;
private final String eventType;
private final Instant timestamp;
public DomainEvent(String eventType) {
this.eventId = UUID.randomUUID().toString();
this.eventType = eventType;
this.timestamp = Instant.now();
}
public DomainEvent(String eventId, String eventType, Instant timestamp) {
this.eventId = eventId;
this.eventType = eventType;
this.timestamp = timestamp;
}
// Getters
public String getEventId() { return eventId; }
public String getEventType() { return eventType; }
public Instant getTimestamp() { return timestamp; }
}
public class CustomerCreatedEvent extends DomainEvent {
private final Long customerId;
private final String email;
private final String firstName;
private final String lastName;
private final String status;
public CustomerCreatedEvent(CustomerChangeEvent changeEvent) {
super("CUSTOMER_CREATED");
this.customerId = changeEvent.getCustomerId();
this.email = changeEvent.getEmail();
this.firstName = changeEvent.getFirstName();
this.lastName = changeEvent.getLastName();
this.status = changeEvent.getStatus();
}
public CustomerCreatedEvent(String eventId, Long customerId, String email,
String firstName, String lastName, String status, Instant timestamp) {
super(eventId, "CUSTOMER_CREATED", timestamp);
this.customerId = customerId;
this.email = email;
this.firstName = firstName;
this.lastName = lastName;
this.status = status;
}
// Getters
public Long getCustomerId() { return customerId; }
public String getEmail() { return email; }
public String getFirstName() { return firstName; }
public String getLastName() { return lastName; }
public String getStatus() { return status; }
}
public class OrderCreatedEvent extends DomainEvent {
private final Long orderId;
private final String status;
public OrderCreatedEvent(Long orderId, String status, Long timestamp) {
super("ORDER_CREATED");
this.orderId = orderId;
this.status = status;
}
// Getters
public Long getOrderId() { return orderId; }
public String getStatus() { return status; }
}
public class OrderStatusChangedEvent extends DomainEvent {
private final Long orderId;
private final String oldStatus;
private final String newStatus;
public OrderStatusChangedEvent(Long orderId, String oldStatus, String newStatus, Long timestamp) {
super("ORDER_STATUS_CHANGED");
this.orderId = orderId;
this.oldStatus = oldStatus;
this.newStatus = newStatus;
}
// Getters
public Long getOrderId() { return orderId; }
public String getOldStatus() { return oldStatus; }
public String getNewStatus() { return newStatus; }
}
5. Event Transformer & Router
@Component
public class EventTransformer {
private static final Logger logger = LoggerFactory.getLogger(EventTransformer.class);
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
private final Map<String, String> topicMappings;
public EventTransformer(KafkaTemplate<String, Object> kafkaTemplate, ObjectMapper objectMapper) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = objectMapper;
this.topicMappings = Map.of(
"CUSTOMER_CREATED", "customer-events",
"CUSTOMER_UPDATED", "customer-events",
"CUSTOMER_DELETED", "customer-events",
"ORDER_CREATED", "order-events",
"ORDER_STATUS_CHANGED", "order-events",
"PRODUCT_CREATED", "product-events",
"PRODUCT_UPDATED", "product-events"
);
}
/**
* Transform and route domain event to appropriate sink
*/
public void transformAndRoute(DomainEvent event) {
try {
// Route to Kafka
routeToKafka(event);
// Route to other sinks based on event type
switch (event.getEventType()) {
case "CUSTOMER_CREATED", "CUSTOMER_UPDATED":
routeToElasticsearch(event);
break;
case "ORDER_CREATED", "ORDER_STATUS_CHANGED":
routeToAnalytics(event);
break;
case "PRODUCT_CREATED", "PRODUCT_UPDATED":
routeToCache(event);
break;
}
logger.debug("Successfully routed event: {} [{}]", event.getEventType(), event.getEventId());
} catch (Exception e) {
logger.error("Failed to route event: {} [{}]", event.getEventType(), event.getEventId(), e);
throw new EventRoutingException("Failed to route event", e);
}
}
private void routeToKafka(DomainEvent event) {
String topic = topicMappings.get(event.getEventType());
if (topic == null) {
logger.warn("No topic mapping for event type: {}", event.getEventType());
return;
}
try {
String message = objectMapper.writeValueAsString(event);
kafkaTemplate.send(topic, event.getEventId(), message)
.addCallback(
result -> logger.debug("Event sent to Kafka: {} [{}]", topic, event.getEventId()),
failure -> logger.error("Failed to send event to Kafka: {} [{}]", topic, event.getEventId(), failure)
);
} catch (Exception e) {
logger.error("Failed to serialize event for Kafka: {}", event.getEventId(), e);
}
}
private void routeToElasticsearch(DomainEvent event) {
// Implementation for Elasticsearch routing
logger.debug("Routing to Elasticsearch: {}", event.getEventType());
}
private void routeToAnalytics(DomainEvent event) {
// Implementation for analytics service
logger.debug("Routing to analytics: {}", event.getEventType());
}
private void routeToCache(DomainEvent event) {
// Implementation for cache invalidation
logger.debug("Routing to cache: {}", event.getEventType());
}
/**
* Transform event for specific sink format
*/
public <T> T transformForSink(DomainEvent event, Class<T> sinkType) {
try {
return objectMapper.convertValue(event, sinkType);
} catch (Exception e) {
logger.error("Failed to transform event for sink: {}", sinkType.getSimpleName(), e);
throw new EventTransformationException("Failed to transform event", e);
}
}
}
Sink Connectors
6. Multiple Sink Implementations
// Elasticsearch sink
@Component
public class ElasticsearchSink {
private static final Logger logger = LoggerFactory.getLogger(ElasticsearchSink.class);
private final RestHighLevelClient elasticsearchClient;
private final ObjectMapper objectMapper;
public ElasticsearchSink(RestHighLevelClient elasticsearchClient, ObjectMapper objectMapper) {
this.elasticsearchClient = elasticsearchClient;
this.objectMapper = objectMapper;
}
/**
* Index customer event in Elasticsearch
*/
public void indexCustomerEvent(CustomerCreatedEvent event) {
try {
Map<String, Object> document = Map.of(
"customerId", event.getCustomerId(),
"email", event.getEmail(),
"firstName", event.getFirstName(),
"lastName", event.getLastName(),
"status", event.getStatus(),
"eventTimestamp", event.getTimestamp(),
"eventType", event.getEventType()
);
IndexRequest request = new IndexRequest("customers")
.id(event.getCustomerId().toString())
.source(document);
elasticsearchClient.index(request, RequestOptions.DEFAULT);
logger.debug("Indexed customer in Elasticsearch: {}", event.getCustomerId());
} catch (Exception e) {
logger.error("Failed to index customer in Elasticsearch: {}", event.getCustomerId(), e);
throw new SinkException("Failed to index in Elasticsearch", e);
}
}
/**
* Update customer document
*/
public void updateCustomerEvent(CustomerUpdatedEvent event) {
try {
UpdateRequest request = new UpdateRequest("customers", event.getCustomerId().toString())
.doc(Map.of(
"email", event.getEmail(),
"firstName", event.getFirstName(),
"lastName", event.getLastName(),
"status", event.getStatus(),
"lastUpdated", event.getTimestamp()
));
elasticsearchClient.update(request, RequestOptions.DEFAULT);
logger.debug("Updated customer in Elasticsearch: {}", event.getCustomerId());
} catch (Exception e) {
logger.error("Failed to update customer in Elasticsearch: {}", event.getCustomerId(), e);
throw new SinkException("Failed to update in Elasticsearch", e);
}
}
}
// S3 sink for archival
@Component
public class S3ArchivalSink {
private static final Logger logger = LoggerFactory.getLogger(S3ArchivalSink.class);
private final AmazonS3 s3Client;
private final ObjectMapper objectMapper;
private final String bucketName;
public S3ArchivalSink(AmazonS3 s3Client, ObjectMapper objectMapper,
@Value("${app.s3.bucket}") String bucketName) {
this.s3Client = s3Client;
this.objectMapper = objectMapper;
this.bucketName = bucketName;
}
/**
* Archive event to S3 for long-term storage
*/
public void archiveEvent(DomainEvent event) {
try {
String key = String.format("events/%s/%s/%s.json",
event.getEventType().toLowerCase(),
event.getTimestamp().toString().substring(0, 10), // YYYY-MM-DD
event.getEventId()
);
String eventJson = objectMapper.writeValueAsString(event);
s3Client.putObject(bucketName, key, eventJson);
logger.debug("Archived event to S3: {} [{}]", event.getEventType(), key);
} catch (Exception e) {
logger.error("Failed to archive event to S3: {}", event.getEventId(), e);
throw new SinkException("Failed to archive to S3", e);
}
}
/**
* Archive batch of events
*/
public void archiveEvents(List<DomainEvent> events) {
if (events.isEmpty()) return;
String batchId = UUID.randomUUID().toString();
String key = String.format("batches/%s/%s.json",
Instant.now().toString().substring(0, 10),
batchId
);
try {
String eventsJson = objectMapper.writeValueAsString(events);
s3Client.putObject(bucketName, key, eventsJson);
logger.debug("Archived batch to S3: {} events [{}]", events.size(), key);
} catch (Exception e) {
logger.error("Failed to archive batch to S3: {}", batchId, e);
throw new SinkException("Failed to archive batch to S3", e);
}
}
}
// Cache invalidation sink
@Component
public class CacheInvalidationSink {
private static final Logger logger = LoggerFactory.getLogger(CacheInvalidationSink.class);
private final RedisTemplate<String, Object> redisTemplate;
public CacheInvalidationSink(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* Invalidate cache entries based on domain events
*/
public void invalidateCache(DomainEvent event) {
try {
switch (event.getEventType()) {
case "CUSTOMER_UPDATED":
CustomerUpdatedEvent customerEvent = (CustomerUpdatedEvent) event;
invalidateCustomerCache(customerEvent.getCustomerId());
break;
case "PRODUCT_UPDATED":
ProductUpdatedEvent productEvent = (ProductUpdatedEvent) event;
invalidateProductCache(productEvent.getProductId());
break;
case "ORDER_CREATED":
OrderCreatedEvent orderEvent = (OrderCreatedEvent) event;
invalidateOrderCaches(orderEvent.getOrderId());
break;
}
} catch (Exception e) {
logger.error("Failed to invalidate cache for event: {}", event.getEventId(), e);
}
}
private void invalidateCustomerCache(Long customerId) {
String pattern = "customer:*:" + customerId;
Set<String> keys = redisTemplate.keys(pattern);
if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
logger.debug("Invalidated customer cache: {} keys", keys.size());
}
}
private void invalidateProductCache(Long productId) {
String pattern = "product:*:" + productId;
Set<String> keys = redisTemplate.keys(pattern);
if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
logger.debug("Invalidated product cache: {} keys", keys.size());
}
}
private void invalidateOrderCaches(Long orderId) {
// Invalidate various order-related caches
String[] patterns = {
"order:details:" + orderId,
"order:summary:" + orderId,
"user:orders:*" // Would need user ID, simplified for example
};
for (String pattern : patterns) {
Set<String> keys = redisTemplate.keys(pattern);
if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
}
}
logger.debug("Invalidated order caches for: {}", orderId);
}
}
Monitoring & Observability
7. Metrics and Health Checks
@Component
public class CDCMetricsService {
private final MeterRegistry meterRegistry;
// Counters
private final Counter eventsReceived;
private final Counter eventsProcessed;
private final Counter eventsFailed;
private final Counter eventsFiltered;
// Gauges
private final Gauge lastProcessedTimestamp;
private volatile long lastEventTimestamp;
// Timers
private final Timer eventProcessingTimer;
private final Timer sinkWriteTimer;
public CDCMetricsService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
// Initialize counters
this.eventsReceived = Counter.builder("cdc.events.received")
.description("Total events received from CDC")
.register(meterRegistry);
this.eventsProcessed = Counter.builder("cdc.events.processed")
.description("Total events processed successfully")
.tag("result", "success")
.register(meterRegistry);
this.eventsFailed = Counter.builder("cdc.events.processed")
.description("Total events failed during processing")
.tag("result", "failure")
.register(meterRegistry);
this.eventsFiltered = Counter.builder("cdc.events.filtered")
.description("Total events filtered out")
.register(meterRegistry);
// Initialize gauge
this.lastProcessedTimestamp = Gauge.builder("cdc.last.processed.timestamp")
.description("Timestamp of last processed event")
.register(meterRegistry, this, CDCMetricsService::getLastEventTimestamp);
// Initialize timers
this.eventProcessingTimer = Timer.builder("cdc.event.processing.time")
.description("Event processing duration")
.register(meterRegistry);
this.sinkWriteTimer = Timer.builder("cdc.sink.write.time")
.description("Sink write duration")
.register(meterRegistry);
}
public void recordEventReceived(String tableName) {
eventsReceived.increment();
meterRegistry.counter("cdc.events.received.by.table", "table", tableName).increment();
}
public void recordEventProcessed(String tableName, String eventType) {
eventsProcessed.increment();
meterRegistry.counter("cdc.events.processed.by.table", "table", tableName).increment();
meterRegistry.counter("cdc.events.processed.by.type", "eventType", eventType).increment();
lastEventTimestamp = System.currentTimeMillis();
}
public void recordEventError(String tableName) {
eventsFailed.increment();
meterRegistry.counter("cdc.events.failed.by.table", "table", tableName).increment();
}
public void recordUnprocessedEvent(String tableName) {
eventsFiltered.increment();
meterRegistry.counter("cdc.events.filtered.by.table", "table", tableName).increment();
}
public Timer.Sample startProcessingTimer() {
return Timer.start(meterRegistry);
}
public void stopProcessingTimer(Timer.Sample sample, String tableName) {
sample.stop(eventProcessingTimer);
meterRegistry.timer("cdc.event.processing.time.by.table", "table", tableName);
}
public Timer.Sample startSinkWriteTimer() {
return Timer.start(meterRegistry);
}
public void stopSinkWriteTimer(Timer.Sample sample, String sinkType) {
sample.stop(sinkWriteTimer);
meterRegistry.timer("cdc.sink.write.time.by.type", "sinkType", sinkType);
}
private double getLastEventTimestamp() {
return lastEventTimestamp;
}
/**
* Get processing statistics
*/
public ProcessingStats getProcessingStats() {
return new ProcessingStats(
eventsReceived.count(),
eventsProcessed.count(),
eventsFailed.count(),
eventsFiltered.count(),
lastEventTimestamp
);
}
public record ProcessingStats(
double eventsReceived,
double eventsProcessed,
double eventsFailed,
double eventsFiltered,
long lastEventTimestamp
) {}
}
// Health indicator for CDC pipeline
@Component
public class CDCHealthIndicator implements HealthIndicator {
private final CDCMetricsService metricsService;
private final DebeziumConnectorManager connectorManager;
public CDCHealthIndicator(CDCMetricsService metricsService,
DebeziumConnectorManager connectorManager) {
this.metricsService = metricsService;
this.connectorManager = connectorManager;
}
@Override
public Health health() {
try {
CDCMetricsService.ProcessingStats stats = metricsService.getProcessingStats();
Map<String, Object> details = new HashMap<>();
// Add metrics details
details.put("eventsReceived", stats.eventsReceived());
details.put("eventsProcessed", stats.eventsProcessed());
details.put("eventsFailed", stats.eventsFailed());
details.put("eventsFiltered", stats.eventsFiltered());
details.put("lastEventTimestamp", stats.lastEventTimestamp());
// Check if pipeline is active
long timeSinceLastEvent = System.currentTimeMillis() - stats.lastEventTimestamp();
boolean isStalled = timeSinceLastEvent > 300000; // 5 minutes
if (isStalled) {
return Health.down()
.withDetail("reason", "No events processed in last 5 minutes")
.withDetails(details)
.build();
}
// Check connector status
DebeziumConnectorManager.ConnectorStatus status =
connectorManager.getConnectorStatus("mysql-connector");
details.put("connectorStatus", status);
if (status == DebeziumConnectorManager.ConnectorStatus.FAILED) {
return Health.down()
.withDetail("reason", "Debezium connector failed")
.withDetails(details)
.build();
}
return Health.up().withDetails(details).build();
} catch (Exception e) {
return Health.down(e).build();
}
}
}
Fault Tolerance & Recovery
8. Error Handling and Recovery
@Component
public class CDCFaultToleranceManager {
private static final Logger logger = LoggerFactory.getLogger(CDCFaultToleranceManager.class);
private final DeadLetterQueueService dlqService;
private final RetryTemplate retryTemplate;
private final CircuitBreaker circuitBreaker;
public CDCFaultToleranceManager(DeadLetterQueueService dlqService) {
this.dlqService = dlqService;
this.retryTemplate = createRetryTemplate();
this.circuitBreaker = CircuitBreaker.ofDefaults("cdcProcessor");
}
/**
* Process event with fault tolerance
*/
public void processWithFaultTolerance(DebeziumChangeEvent event,
Consumer<DebeziumChangeEvent> processor) {
try {
// Use circuit breaker pattern
circuitBreaker.executeRunnable(() -> {
// Use retry template for transient failures
retryTemplate.execute(context -> {
processor.accept(event);
return null;
});
});
} catch (Exception e) {
logger.error("Failed to process event after retries, sending to DLQ", e);
dlqService.sendToDLQ(event, e);
}
}
/**
* Process batch with fault tolerance
*/
public void processBatchWithFaultTolerance(List<DebeziumChangeEvent> events,
Consumer<List<DebeziumChangeEvent>> processor) {
if (events.isEmpty()) return;
List<DebeziumChangeEvent> successful = new ArrayList<>();
List<FailedEvent> failed = new ArrayList<>();
for (DebeziumChangeEvent event : events) {
try {
processWithFaultTolerance(event, e -> {
// Individual event processing would happen here
// For batch, we collect successful events
successful.add(e);
});
} catch (Exception e) {
failed.add(new FailedEvent(event, e));
}
}
// Process successful events as batch
if (!successful.isEmpty()) {
try {
processor.accept(successful);
} catch (Exception e) {
logger.error("Failed to process batch, sending all to DLQ", e);
successful.forEach(event -> dlqService.sendToDLQ(event, e));
}
}
// Handle failed events
if (!failed.isEmpty()) {
failed.forEach(failedEvent ->
dlqService.sendToDLQ(failedEvent.event(), failedEvent.exception()));
}
}
/**
* Replay events from DLQ
*/
public void replayFromDLQ(String source, Instant fromTimestamp) {
List<DeadLetterMessage> messages = dlqService.getMessagesFromDLQ(source, fromTimestamp);
for (DeadLetterMessage message : messages) {
try {
processWithFaultTolerance(message.getOriginalEvent(), this::processEvent);
dlqService.acknowledgeDLQMessage(message.getId());
logger.info("Successfully replayed DLQ message: {}", message.getId());
} catch (Exception e) {
logger.error("Failed to replay DLQ message: {}", message.getId(), e);
// Message remains in DLQ for manual intervention
}
}
}
private RetryTemplate createRetryTemplate() {
RetryTemplate template = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(30000);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
template.setBackOffPolicy(backOffPolicy);
template.setRetryPolicy(retryPolicy);
return template;
}
private void processEvent(DebeziumChangeEvent event) {
// Actual event processing logic
logger.debug("Processing event: {}", event.getSource().getTable());
}
private record FailedEvent(DebeziumChangeEvent event, Exception exception) {}
}
// Dead Letter Queue Service
@Component
public class DeadLetterQueueService {
private static final Logger logger = LoggerFactory.getLogger(DeadLetterQueueService.class);
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
private final String dlqTopic;
public DeadLetterQueueService(KafkaTemplate<String, Object> kafkaTemplate,
ObjectMapper objectMapper,
@Value("${app.kafka.topics.dlq}") String dlqTopic) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = objectMapper;
this.dlqTopic = dlqTopic;
}
/**
* Send failed event to Dead Letter Queue
*/
public void sendToDLQ(DebeziumChangeEvent event, Exception failure) {
try {
DeadLetterMessage dlqMessage = new DeadLetterMessage(event, failure);
String message = objectMapper.writeValueAsString(dlqMessage);
kafkaTemplate.send(dlqTopic, event.getSource().getTable(), message)
.addCallback(
result -> logger.debug("Sent to DLQ: {}", dlqMessage.getId()),
error -> logger.error("Failed to send to DLQ: {}", dlqMessage.getId(), error)
);
} catch (Exception e) {
logger.error("Failed to serialize DLQ message for event", e);
}
}
/**
* Get messages from DLQ for replay
*/
public List<DeadLetterMessage> getMessagesFromDLQ(String source, Instant fromTimestamp) {
// Implementation would depend on your DLQ storage
// This could query Kafka, database, or other storage
logger.info("Retrieving DLQ messages for source: {} from: {}", source, fromTimestamp);
return List.of(); // Simplified
}
/**
* Acknowledge DLQ message after successful replay
*/
public void acknowledgeDLQMessage(String messageId) {
// Implementation for acknowledging DLQ messages
logger.debug("Acknowledged DLQ message: {}", messageId);
}
public static class DeadLetterMessage {
private final String id;
private final DebeziumChangeEvent originalEvent;
private final String failureReason;
private final String stackTrace;
private final Instant failedAt;
private final int failureCount;
public DeadLetterMessage(DebeziumChangeEvent originalEvent, Exception failure) {
this.id = UUID.randomUUID().toString();
this.originalEvent = originalEvent;
this.failureReason = failure.getMessage();
this.stackTrace = getStackTrace(failure);
this.failedAt = Instant.now();
this.failureCount = 1;
}
private String getStackTrace(Exception e) {
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
return sw.toString();
}
// Getters
public String getId() { return id; }
public DebeziumChangeEvent getOriginalEvent() { return originalEvent; }
public String getFailureReason() { return failureReason; }
public String getStackTrace() { return stackTrace; }
public Instant getFailedAt() { return failedAt; }
public int getFailureCount() { return failureCount; }
}
}
Spring Boot Integration
9. Complete Spring Boot Configuration
# application.yml spring: datasource: url: jdbc:mysql://localhost:3306/cdc_source username: cdc_user password: cdc_password hikari: maximum-pool-size: 10 minimum-idle: 2 kafka: bootstrap-servers: localhost:9092 consumer: group-id: cdc-consumer auto-offset-reset: earliest enable-auto-commit: false key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: all retries: 3 jackson: property-naming-strategy: SNAKE_CASE default-property-inclusion: NON_NULL # CDC Configuration cdc: debezium: enabled: true mysql: host: localhost port: 3306 user: debezium password: dbz server-name: mysql-cdc polling: enabled: false tables: - table-name: customers id-column: id timestamp-column: updated_at poll-interval-ms: 5000 batch-size: 100 sinks: kafka: enabled: true elasticsearch: enabled: true hosts: http://localhost:9200 s3: enabled: false bucket: cdc-archive fault-tolerance: retry: max-attempts: 3 backoff-initial-interval: 1000 backoff-multiplier: 2.0 circuit-breaker: failure-rate-threshold: 50 slow-call-rate-threshold: 50 wait-duration-in-open-state: 10s # Management endpoints management: endpoints: web: exposure: include: health,metrics,info,prometheus endpoint: health: show-details: always metrics: export: prometheus: enabled: true
10. Spring Configuration Classes
@Configuration
@EnableConfigurationProperties(CDCProperties.class)
public class CDCConfiguration {
@Bean
@ConditionalOnProperty(name = "cdc.debezium.enabled", havingValue = "true")
public DebeziumConnectorManager debeziumConnectorManager(KafkaConnectClient connectClient,
ObjectMapper objectMapper) {
return new DebeziumConnectorManager(connectClient, objectMapper);
}
@Bean
@ConditionalOnProperty(name = "cdc.polling.enabled", havingValue = "true")
public DatabasePollingCDC databasePollingCDC(DataSource dataSource,
ChangeEventProcessor eventProcessor) {
return new DatabasePollingCDC(dataSource, eventProcessor);
}
@Bean
public ChangeEventProcessor changeEventProcessor(EventTransformer eventTransformer,
CDCMetricsService metricsService) {
return new ChangeEventProcessor(eventTransformer, metricsService);
}
@Bean
public EventTransformer eventTransformer(KafkaTemplate<String, Object> kafkaTemplate,
ObjectMapper objectMapper) {
return new EventTransformer(kafkaTemplate, objectMapper);
}
@Bean
public CDCMetricsService cdcMetricsService(MeterRegistry meterRegistry) {
return new CDCMetricsService(meterRegistry);
}
@Bean
public CDCFaultToleranceManager cdcFaultToleranceManager(DeadLetterQueueService dlqService) {
return new CDCFaultToleranceManager(dlqService);
}
@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper()
.registerModule(new JavaTimeModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
}
}
@ConfigurationProperties(prefix = "cdc")
@Data
public class CDCProperties {
private Debezium debezium = new Debezium();
private Polling polling = new Polling();
private Sinks sinks = new Sinks();
private FaultTolerance faultTolerance = new FaultTolerance();
@Data
public static class Debezium {
private boolean enabled = true;
private MySql mysql = new MySql();
@Data
public static class MySql {
private String host = "localhost";
private int port = 3306;
private String user = "debezium";
private String password = "dbz";
private String serverName = "mysql-cdc";
}
}
@Data
public static class Polling {
private boolean enabled = false;
private List<TableConfig> tables = new ArrayList<>();
@Data
public static class TableConfig {
private String tableName;
private String idColumn = "id";
private String timestampColumn = "updated_at";
private long pollIntervalMs = 5000;
private long initialDelayMs = 1000;
private int batchSize = 100;
}
}
@Data
public static class Sinks {
private Kafka kafka = new Kafka();
private Elasticsearch elasticsearch = new Elasticsearch();
private S3 s3 = new S3();
@Data
public static class Kafka {
private boolean enabled = true;
}
@Data
public static class Elasticsearch {
private boolean enabled = false;
private String hosts = "http://localhost:9200";
}
@Data
public static class S3 {
private boolean enabled = false;
private String bucket;
}
}
@Data
public static class FaultTolerance {
private Retry retry = new Retry();
private CircuitBreaker circuitBreaker = new CircuitBreaker();
@Data
public static class Retry {
private int maxAttempts = 3;
private long backoffInitialInterval = 1000;
private double backoffMultiplier = 2.0;
}
@Data
public static class CircuitBreaker {
private int failureRateThreshold = 50;
private int slowCallRateThreshold = 50;
private String waitDurationInOpenState = "10s";
}
}
}
// Main application with Kafka listener
@SpringBootApplication
@EnableKafka
public class CDCPipelineApplication {
public static void main(String[] args) {
SpringApplication.run(CDCPipelineApplication.class, args);
}
@Bean
public CommandLineRunner init(DebeziumConnectorManager connectorManager,
CDCProperties properties) {
return args -> {
if (properties.getDebezium().isEnabled()) {
// Start Debezium connector
CDCProperties.Debezium.MySql mysqlConfig = properties.getDebezium().getMysql();
DebeziumConnectorManager.MySqlConfig config =
new DebeziumConnectorManager.MySqlConfig(
mysqlConfig.getHost(),
mysqlConfig.getPort(),
mysqlConfig.getUser(),
mysqlConfig.getPassword(),
"12345",
mysqlConfig.getServerName(),
"inventory",
"inventory.customers,inventory.orders,inventory.products",
"localhost:9092",
"dbhistory.inventory",
false
);
connectorManager.startMySqlConnector("mysql-connector", config);
}
};
}
}
// Kafka consumer for Debezium events
@Component
public class DebeziumEventConsumer {
private static final Logger logger = LoggerFactory.getLogger(DebeziumEventConsumer.class);
private final ChangeEventProcessor eventProcessor;
private final ObjectMapper objectMapper;
private final CDCMetricsService metricsService;
public DebeziumEventConsumer(ChangeEventProcessor eventProcessor,
ObjectMapper objectMapper,
CDCMetricsService metricsService) {
this.eventProcessor = eventProcessor;
this.objectMapper = objectMapper;
this.metricsService = metricsService;
}
@KafkaListener(topics = "${app.kafka.topics.cdc:mysql-cdc.inventory}")
public void consumeDebeziumEvent(ConsumerRecord<String, String> record) {
Timer.Sample timer = metricsService.startProcessingTimer();
try {
DebeziumChangeEvent event = objectMapper.readValue(record.value(), DebeziumChangeEvent.class);
metricsService.recordEventReceived(event.getSource().getTable());
eventProcessor.processDebeziumEvent(event);
metricsService.stopProcessingTimer(timer, event.getSource().getTable());
} catch (Exception e) {
logger.error("Failed to process Debezium event", e);
metricsService.recordEventError("unknown");
}
}
@KafkaListener(topics = "${app.kafka.topics.dlq:cdc-dlq}")
public void consumeDLQEvent(ConsumerRecord<String, String> record) {
logger.warn("Received DLQ event: {} - {}", record.key(), record.value());
// Implement DLQ event handling or alerting
}
}
This comprehensive CDC pipeline implementation provides:
- Multiple CDC approaches (Debezium and custom polling)
- Robust event processing with transformation and routing
- Multiple sink support (Kafka, Elasticsearch, S3, cache)
- Comprehensive monitoring with metrics and health checks
- Fault tolerance with retries, circuit breakers, and DLQ
- Spring Boot integration for easy deployment
The pipeline ensures reliable, scalable, and maintainable change data capture for modern data architectures.