Introduction
Eventual consistency is a fundamental concept in distributed systems where data replicas eventually become consistent across services, without requiring immediate synchronization. In microservices architectures, maintaining strong consistency is often impractical due to the distributed nature of the system. This guide explores patterns and Java implementations for achieving eventual consistency while maintaining system reliability and performance.
Core Concepts and Patterns
1. Consistency Models
- Strong Consistency: Immediate consistency (ACID)
- Eventual Consistency: Guarantees consistency over time
- Causal Consistency: Preserves causal relationships
- Read-your-writes Consistency: Ensures you see your own writes
2. Common Patterns
- Saga Pattern: Distributed transactions with compensation
- Event Sourcing: Store state changes as events
- CQRS: Separate read and write models
- Outbox Pattern: Reliable event publishing
- Change Data Capture: Capture database changes
Project Setup and Dependencies
1. Maven Dependencies
<dependencies> <!-- Spring Boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- Database --> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <scope>runtime</scope> </dependency> <!-- Messaging --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!-- Resilience --> <dependency> <groupId>org.springframework.retry</groupId> <artifactId>spring-retry</artifactId> </dependency> <dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-spring-boot2</artifactId> </dependency> <!-- Testing --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>junit-jupiter</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2022.0.4</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
2. Domain Models
// Order Service Domain
@Entity
@Table(name = "orders")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Order {
@Id
private String orderId;
@Enumerated(EnumType.STRING)
private OrderStatus status;
private String customerId;
private BigDecimal totalAmount;
@ElementCollection
@CollectionTable(name = "order_items", joinColumns = @JoinColumn(name = "order_id"))
private List<OrderItem> items;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
@Version
private Long version;
@PrePersist
protected void onCreate() {
this.createdAt = LocalDateTime.now();
this.updatedAt = LocalDateTime.now();
}
@PreUpdate
protected void onUpdate() {
this.updatedAt = LocalDateTime.now();
}
}
@Embeddable
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class OrderItem {
private String productId;
private String productName;
private Integer quantity;
private BigDecimal unitPrice;
}
public enum OrderStatus {
PENDING, CONFIRMED, PAYMENT_PENDING, PAYMENT_COMPLETED,
PAYMENT_FAILED, SHIPPED, DELIVERED, CANCELLED
}
// Payment Service Domain
@Entity
@Table(name = "payments")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Payment {
@Id
private String paymentId;
private String orderId;
private String customerId;
private BigDecimal amount;
@Enumerated(EnumType.STRING)
private PaymentStatus status;
private String transactionId;
private LocalDateTime paymentDate;
private LocalDateTime createdAt;
}
public enum PaymentStatus {
PENDING, COMPLETED, FAILED, REFUNDED
}
// Inventory Service Domain
@Entity
@Table(name = "inventory")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Inventory {
@Id
private String productId;
private String productName;
private Integer availableQuantity;
private Integer reservedQuantity;
@Version
private Long version;
public boolean canFulfill(int quantity) {
return availableQuantity >= quantity;
}
public void reserve(int quantity) {
if (!canFulfill(quantity)) {
throw new InsufficientInventoryException(
"Insufficient inventory for product: " + productId);
}
availableQuantity -= quantity;
reservedQuantity += quantity;
}
public void release(int quantity) {
reservedQuantity -= quantity;
availableQuantity += quantity;
}
public void confirmReservation(int quantity) {
reservedQuantity -= quantity;
}
}
public class InsufficientInventoryException extends RuntimeException {
public InsufficientInventoryException(String message) {
super(message);
}
}
Saga Pattern Implementation
1. Saga Orchestrator
@Component
@Slf4j
public class OrderSagaOrchestrator {
private final OrderService orderService;
private final PaymentService paymentService;
private final InventoryService inventoryService;
private final SagaEventPublisher eventPublisher;
public OrderSagaOrchestrator(OrderService orderService,
PaymentService paymentService,
InventoryService inventoryService,
SagaEventPublisher eventPublisher) {
this.orderService = orderService;
this.paymentService = paymentService;
this.inventoryService = inventoryService;
this.eventPublisher = eventPublisher;
}
@Async
@Transactional
public void createOrderSaga(CreateOrderCommand command) {
SagaContext context = new SagaContext(command.getOrderId());
try {
// Step 1: Create order in PENDING state
log.info("Starting saga for order: {}", command.getOrderId());
Order order = orderService.createPendingOrder(command);
context.setOrder(order);
// Step 2: Reserve inventory
reserveInventory(context, command);
// Step 3: Process payment
processPayment(context, command);
// Step 4: Confirm order
confirmOrder(context);
log.info("Saga completed successfully for order: {}", command.getOrderId());
} catch (Exception e) {
log.error("Saga failed for order: {}", command.getOrderId(), e);
compensate(context, e);
}
}
private void reserveInventory(SagaContext context, CreateOrderCommand command) {
try {
inventoryService.reserveInventory(command.getOrderId(), command.getItems());
context.setInventoryReserved(true);
log.info("Inventory reserved for order: {}", context.getOrderId());
} catch (InsufficientInventoryException e) {
log.warn("Inventory reservation failed for order: {}", context.getOrderId());
throw new SagaException("Inventory reservation failed", e);
}
}
private void processPayment(SagaContext context, CreateOrderCommand command) {
try {
Payment payment = paymentService.processPayment(
command.getOrderId(),
command.getCustomerId(),
command.getTotalAmount()
);
context.setPayment(payment);
context.setPaymentProcessed(true);
log.info("Payment processed for order: {}", context.getOrderId());
} catch (PaymentException e) {
log.warn("Payment processing failed for order: {}", context.getOrderId());
throw new SagaException("Payment processing failed", e);
}
}
private void confirmOrder(SagaContext context) {
orderService.confirmOrder(context.getOrderId());
inventoryService.confirmReservation(context.getOrderId());
context.setOrderConfirmed(true);
eventPublisher.publishOrderConfirmed(context.getOrderId());
log.info("Order confirmed: {}", context.getOrderId());
}
private void compensate(SagaContext context, Exception failure) {
log.info("Starting compensation for order: {}", context.getOrderId());
try {
// Compensate in reverse order
if (context.isPaymentProcessed()) {
paymentService.refundPayment(context.getOrderId());
log.info("Payment refunded for order: {}", context.getOrderId());
}
if (context.isInventoryReserved()) {
inventoryService.releaseReservation(context.getOrderId());
log.info("Inventory released for order: {}", context.getOrderId());
}
if (context.getOrder() != null) {
orderService.cancelOrder(context.getOrderId(), failure.getMessage());
log.info("Order cancelled: {}", context.getOrderId());
}
eventPublisher.publishOrderFailed(context.getOrderId(), failure.getMessage());
} catch (Exception compException) {
log.error("Compensation failed for order: {}", context.getOrderId(), compException);
// In real scenario, would trigger alert/monitoring
}
}
@Data
private static class SagaContext {
private final String orderId;
private Order order;
private Payment payment;
private boolean inventoryReserved = false;
private boolean paymentProcessed = false;
private boolean orderConfirmed = false;
public SagaContext(String orderId) {
this.orderId = orderId;
}
}
public static class SagaException extends RuntimeException {
public SagaException(String message) {
super(message);
}
public SagaException(String message, Throwable cause) {
super(message, cause);
}
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class CreateOrderCommand {
private String orderId;
private String customerId;
private List<OrderItem> items;
private BigDecimal totalAmount;
private String paymentMethod;
}
2. Saga Services
@Service
@Slf4j
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final ApplicationEventPublisher eventPublisher;
public OrderService(OrderRepository orderRepository,
ApplicationEventPublisher eventPublisher) {
this.orderRepository = orderRepository;
this.eventPublisher = eventPublisher;
}
public Order createPendingOrder(CreateOrderCommand command) {
Order order = Order.builder()
.orderId(command.getOrderId())
.customerId(command.getCustomerId())
.items(command.getItems())
.totalAmount(command.getTotalAmount())
.status(OrderStatus.PENDING)
.build();
Order saved = orderRepository.save(order);
eventPublisher.publishEvent(new OrderCreatedEvent(
saved.getOrderId(),
saved.getCustomerId(),
saved.getItems(),
saved.getTotalAmount()
));
return saved;
}
public void confirmOrder(String orderId) {
orderRepository.findById(orderId).ifPresent(order -> {
order.setStatus(OrderStatus.CONFIRMED);
orderRepository.save(order);
eventPublisher.publishEvent(new OrderConfirmedEvent(orderId));
});
}
public void cancelOrder(String orderId, String reason) {
orderRepository.findById(orderId).ifPresent(order -> {
order.setStatus(OrderStatus.CANCELLED);
orderRepository.save(order);
eventPublisher.publishEvent(new OrderCancelledEvent(orderId, reason));
});
}
@Transactional(readOnly = true)
public Optional<Order> getOrder(String orderId) {
return orderRepository.findById(orderId);
}
}
@Service
@Slf4j
@Transactional
public class PaymentService {
private final PaymentRepository paymentRepository;
private final PaymentGateway paymentGateway;
public PaymentService(PaymentRepository paymentRepository,
PaymentGateway paymentGateway) {
this.paymentRepository = paymentRepository;
this.paymentGateway = paymentGateway;
}
public Payment processPayment(String orderId, String customerId, BigDecimal amount) {
try {
// Simulate payment processing
PaymentGatewayResponse response = paymentGateway.charge(amount);
Payment payment = Payment.builder()
.paymentId(UUID.randomUUID().toString())
.orderId(orderId)
.customerId(customerId)
.amount(amount)
.status(PaymentStatus.COMPLETED)
.transactionId(response.getTransactionId())
.paymentDate(LocalDateTime.now())
.createdAt(LocalDateTime.now())
.build();
Payment saved = paymentRepository.save(payment);
log.info("Payment processed successfully for order: {}", orderId);
return saved;
} catch (PaymentGatewayException e) {
// Record failed payment attempt
Payment failedPayment = Payment.builder()
.paymentId(UUID.randomUUID().toString())
.orderId(orderId)
.customerId(customerId)
.amount(amount)
.status(PaymentStatus.FAILED)
.createdAt(LocalDateTime.now())
.build();
paymentRepository.save(failedPayment);
throw new PaymentException("Payment processing failed", e);
}
}
public void refundPayment(String orderId) {
paymentRepository.findByOrderId(orderId).ifPresent(payment -> {
if (payment.getStatus() == PaymentStatus.COMPLETED) {
try {
paymentGateway.refund(payment.getTransactionId());
payment.setStatus(PaymentStatus.REFUNDED);
paymentRepository.save(payment);
log.info("Payment refunded for order: {}", orderId);
} catch (PaymentGatewayException e) {
log.error("Refund failed for order: {}", orderId, e);
throw new PaymentException("Refund failed", e);
}
}
});
}
}
@Service
@Slf4j
@Transactional
public class InventoryService {
private final InventoryRepository inventoryRepository;
private final InventoryReservationRepository reservationRepository;
public InventoryService(InventoryRepository inventoryRepository,
InventoryReservationRepository reservationRepository) {
this.inventoryRepository = inventoryRepository;
this.reservationRepository = reservationRepository;
}
public void reserveInventory(String orderId, List<OrderItem> items) {
for (OrderItem item : items) {
Inventory inventory = inventoryRepository.findById(item.getProductId())
.orElseThrow(() -> new InventoryNotFoundException(
"Product not found: " + item.getProductId()));
if (!inventory.canFulfill(item.getQuantity())) {
throw new InsufficientInventoryException(
"Insufficient inventory for product: " + item.getProductId());
}
// Reserve inventory
inventory.reserve(item.getQuantity());
inventoryRepository.save(inventory);
// Record reservation
InventoryReservation reservation = InventoryReservation.builder()
.reservationId(UUID.randomUUID().toString())
.orderId(orderId)
.productId(item.getProductId())
.quantity(item.getQuantity())
.status(ReservationStatus.RESERVED)
.createdAt(LocalDateTime.now())
.build();
reservationRepository.save(reservation);
log.info("Reserved {} units of product {} for order {}",
item.getQuantity(), item.getProductId(), orderId);
}
}
public void releaseReservation(String orderId) {
List<InventoryReservation> reservations =
reservationRepository.findByOrderIdAndStatus(orderId, ReservationStatus.RESERVED);
for (InventoryReservation reservation : reservations) {
inventoryRepository.findById(reservation.getProductId()).ifPresent(inventory -> {
inventory.release(reservation.getQuantity());
inventoryRepository.save(inventory);
});
reservation.setStatus(ReservationStatus.RELEASED);
reservationRepository.save(reservation);
log.info("Released reservation for product {} in order {}",
reservation.getProductId(), orderId);
}
}
public void confirmReservation(String orderId) {
List<InventoryReservation> reservations =
reservationRepository.findByOrderIdAndStatus(orderId, ReservationStatus.RESERVED);
for (InventoryReservation reservation : reservations) {
inventoryRepository.findById(reservation.getProductId()).ifPresent(inventory -> {
inventory.confirmReservation(reservation.getQuantity());
inventoryRepository.save(inventory);
});
reservation.setStatus(ReservationStatus.CONFIRMED);
reservationRepository.save(reservation);
}
log.info("Confirmed reservations for order: {}", orderId);
}
}
Outbox Pattern Implementation
1. Outbox Entity and Configuration
@Entity
@Table(name = "outbox_events")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private String aggregateType;
@Column(nullable = false)
private String eventType;
@Column(nullable = false, columnDefinition = "TEXT")
private String payload;
@Enumerated(EnumType.STRING)
private OutboxStatus status;
private LocalDateTime createdAt;
private LocalDateTime processedAt;
@Version
private Long version;
@PrePersist
protected void onCreate() {
this.createdAt = LocalDateTime.now();
this.status = OutboxStatus.PENDING;
}
}
public enum OutboxStatus {
PENDING, PROCESSED, FAILED
}
@Configuration
@EnableScheduling
public class OutboxConfiguration {
@Bean
public OutboxEventProcessor outboxEventProcessor(
OutboxEventRepository repository,
ObjectMapper objectMapper,
ApplicationEventPublisher eventPublisher) {
return new OutboxEventProcessor(repository, objectMapper, eventPublisher);
}
}
2. Outbox Event Processor
@Component
@Slf4j
public class OutboxEventProcessor {
private final OutboxEventRepository repository;
private final ObjectMapper objectMapper;
private final ApplicationEventPublisher eventPublisher;
private static final int BATCH_SIZE = 100;
private static final Duration PROCESSING_TIMEOUT = Duration.ofMinutes(5);
public OutboxEventProcessor(OutboxEventRepository repository,
ObjectMapper objectMapper,
ApplicationEventPublisher eventPublisher) {
this.repository = repository;
this.objectMapper = objectMapper;
this.eventPublisher = eventPublisher;
}
@Scheduled(fixedDelay = 5000) // Process every 5 seconds
@Transactional
public void processOutboxEvents() {
LocalDateTime cutoffTime = LocalDateTime.now().minus(PROCESSING_TIMEOUT);
List<OutboxEvent> pendingEvents = repository.findByStatusAndCreatedAtBefore(
OutboxStatus.PENDING, cutoffTime, PageRequest.of(0, BATCH_SIZE));
for (OutboxEvent event : pendingEvents) {
try {
processEvent(event);
event.setStatus(OutboxStatus.PROCESSED);
event.setProcessedAt(LocalDateTime.now());
repository.save(event);
log.info("Processed outbox event: {} for aggregate: {}",
event.getEventType(), event.getAggregateId());
} catch (Exception e) {
log.error("Failed to process outbox event: {}", event.getId(), e);
event.setStatus(OutboxStatus.FAILED);
repository.save(event);
}
}
}
private void processEvent(OutboxEvent event) throws Exception {
switch (event.getEventType()) {
case "ORDER_CREATED":
OrderCreatedEvent orderCreated = objectMapper.readValue(
event.getPayload(), OrderCreatedEvent.class);
eventPublisher.publishEvent(orderCreated);
break;
case "ORDER_CONFIRMED":
OrderConfirmedEvent orderConfirmed = objectMapper.readValue(
event.getPayload(), OrderConfirmedEvent.class);
eventPublisher.publishEvent(orderConfirmed);
break;
case "ORDER_CANCELLED":
OrderCancelledEvent orderCancelled = objectMapper.readValue(
event.getPayload(), OrderCancelledEvent.class);
eventPublisher.publishEvent(orderCancelled);
break;
default:
log.warn("Unknown event type: {}", event.getEventType());
}
}
}
@Component
@Transactional
public class OutboxEventService {
private final OutboxEventRepository repository;
private final ObjectMapper objectMapper;
public OutboxEventService(OutboxEventRepository repository,
ObjectMapper objectMapper) {
this.repository = repository;
this.objectMapper = objectMapper;
}
public void saveEvent(String aggregateId, String aggregateType,
String eventType, Object event) {
try {
String payload = objectMapper.writeValueAsString(event);
OutboxEvent outboxEvent = OutboxEvent.builder()
.aggregateId(aggregateId)
.aggregateType(aggregateType)
.eventType(eventType)
.payload(payload)
.build();
repository.save(outboxEvent);
log.debug("Saved outbox event: {} for aggregate: {}", eventType, aggregateId);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize event payload", e);
}
}
}
Event Sourcing Implementation
1. Event Store and Aggregates
@Entity
@Table(name = "event_store")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class EventStore {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private String aggregateType;
@Column(nullable = false)
private String eventType;
@Column(nullable = false, columnDefinition = "TEXT")
private String eventData;
@Column(nullable = false)
private Long version;
@Column(nullable = false)
private LocalDateTime timestamp;
@Column(nullable = false)
private String metadata;
}
@Component
@Slf4j
public class OrderAggregate {
private String orderId;
private OrderStatus status;
private String customerId;
private BigDecimal totalAmount;
private List<OrderItem> items;
private Long version;
private final List<Object> pendingEvents = new ArrayList<>();
public OrderAggregate() {}
public static OrderAggregate create(String orderId, String customerId,
List<OrderItem> items, BigDecimal totalAmount) {
OrderAggregate aggregate = new OrderAggregate();
aggregate.apply(new OrderCreatedEvent(orderId, customerId, items, totalAmount));
return aggregate;
}
public void confirm() {
if (this.status != OrderStatus.PENDING) {
throw new IllegalStateException("Order cannot be confirmed from state: " + status);
}
apply(new OrderConfirmedEvent(orderId));
}
public void cancel(String reason) {
apply(new OrderCancelledEvent(orderId, reason));
}
private void apply(OrderCreatedEvent event) {
this.orderId = event.getOrderId();
this.customerId = event.getCustomerId();
this.items = event.getItems();
this.totalAmount = event.getTotalAmount();
this.status = OrderStatus.PENDING;
this.version = 0L;
this.pendingEvents.add(event);
}
private void apply(OrderConfirmedEvent event) {
this.status = OrderStatus.CONFIRMED;
this.version++;
this.pendingEvents.add(event);
}
private void apply(OrderCancelledEvent event) {
this.status = OrderStatus.CANCELLED;
this.version++;
this.pendingEvents.add(event);
}
public List<Object> getPendingEvents() {
return new ArrayList<>(pendingEvents);
}
public void clearPendingEvents() {
pendingEvents.clear();
}
// Rehydrate aggregate from events
public static OrderAggregate rehydrate(List<EventStore> events) {
OrderAggregate aggregate = new OrderAggregate();
for (EventStore event : events) {
aggregate.applyEvent(event);
}
return aggregate;
}
private void applyEvent(EventStore event) {
// Deserialize and apply event based on eventType
// Implementation depends on your event deserialization strategy
}
}
@Service
@Slf4j
@Transactional
public class EventSourcingService {
private final EventStoreRepository eventStoreRepository;
private final ObjectMapper objectMapper;
public EventSourcingService(EventStoreRepository eventStoreRepository,
ObjectMapper objectMapper) {
this.eventStoreRepository = eventStoreRepository;
this.objectMapper = objectMapper;
}
public void saveEvents(String aggregateId, String aggregateType,
List<Object> events, Long currentVersion) {
long expectedVersion = currentVersion != null ? currentVersion : -1;
for (Object event : events) {
expectedVersion++;
try {
String eventType = event.getClass().getSimpleName();
String eventData = objectMapper.writeValueAsString(event);
String metadata = "{}"; // Could include correlation ID, user ID, etc.
EventStore eventStore = EventStore.builder()
.aggregateId(aggregateId)
.aggregateType(aggregateType)
.eventType(eventType)
.eventData(eventData)
.version(expectedVersion)
.timestamp(LocalDateTime.now())
.metadata(metadata)
.build();
eventStoreRepository.save(eventStore);
log.debug("Saved event: {} for aggregate: {} version: {}",
eventType, aggregateId, expectedVersion);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize event", e);
}
}
}
public List<EventStore> getEventsForAggregate(String aggregateId) {
return eventStoreRepository.findByAggregateIdOrderByVersionAsc(aggregateId);
}
public Optional<OrderAggregate> getOrderAggregate(String orderId) {
List<EventStore> events = getEventsForAggregate(orderId);
if (events.isEmpty()) {
return Optional.empty();
}
OrderAggregate aggregate = OrderAggregate.rehydrate(events);
return Optional.of(aggregate);
}
}
Message Broker Integration
1. RabbitMQ Configuration
@Configuration
public class RabbitMQConfig {
public static final String ORDER_EVENTS_EXCHANGE = "order.events.exchange";
public static final String ORDER_CREATED_QUEUE = "order.created.queue";
public static final String ORDER_CONFIRMED_QUEUE = "order.confirmed.queue";
public static final String ORDER_CANCELLED_QUEUE = "order.cancelled.queue";
@Bean
public TopicExchange orderEventsExchange() {
return new TopicExchange(ORDER_EVENTS_EXCHANGE);
}
@Bean
public Queue orderCreatedQueue() {
return new Queue(ORDER_CREATED_QUEUE, true);
}
@Bean
public Queue orderConfirmedQueue() {
return new Queue(ORDER_CONFIRMED_QUEUE, true);
}
@Bean
public Queue orderCancelledQueue() {
return new Queue(ORDER_CANCELLED_QUEUE, true);
}
@Bean
public Binding orderCreatedBinding(Queue orderCreatedQueue, TopicExchange orderEventsExchange) {
return BindingBuilder.bind(orderCreatedQueue)
.to(orderEventsExchange)
.with("order.created");
}
@Bean
public Binding orderConfirmedBinding(Queue orderConfirmedQueue, TopicExchange orderEventsExchange) {
return BindingBuilder.bind(orderConfirmedQueue)
.to(orderEventsExchange)
.with("order.confirmed");
}
@Bean
public Binding orderCancelledBinding(Queue orderCancelledQueue, TopicExchange orderEventsExchange) {
return BindingBuilder.bind(orderCancelledQueue)
.to(orderEventsExchange)
.with("order.cancelled");
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter());
template.setChannelTransacted(true);
return template;
}
}
2. Event Publishers and Consumers
@Component
@Slf4j
public class OrderEventPublisher {
private final RabbitTemplate rabbitTemplate;
private final OutboxEventService outboxEventService;
public OrderEventPublisher(RabbitTemplate rabbitTemplate,
OutboxEventService outboxEventService) {
this.rabbitTemplate = rabbitTemplate;
this.outboxEventService = outboxEventService;
}
@EventListener
@Transactional
public void handleOrderCreated(OrderCreatedEvent event) {
// Save to outbox first
outboxEventService.saveEvent(
event.getOrderId(),
"ORDER",
"ORDER_CREATED",
event
);
// Then publish to message broker (optional - could be done by outbox processor)
rabbitTemplate.convertAndSend(
RabbitMQConfig.ORDER_EVENTS_EXCHANGE,
"order.created",
event
);
log.info("Published OrderCreatedEvent for order: {}", event.getOrderId());
}
@EventListener
public void handleOrderConfirmed(OrderConfirmedEvent event) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.ORDER_EVENTS_EXCHANGE,
"order.confirmed",
event
);
log.info("Published OrderConfirmedEvent for order: {}", event.getOrderId());
}
@EventListener
public void handleOrderCancelled(OrderCancelledEvent event) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.ORDER_EVENTS_EXCHANGE,
"order.cancelled",
event
);
log.info("Published OrderCancelledEvent for order: {}", event.getOrderId());
}
}
@Component
@Slf4j
public class OrderEventConsumer {
private final InventoryService inventoryService;
private final NotificationService notificationService;
public OrderEventConsumer(InventoryService inventoryService,
NotificationService notificationService) {
this.inventoryService = inventoryService;
this.notificationService = notificationService;
}
@RabbitListener(queues = RabbitMQConfig.ORDER_CREATED_QUEUE)
public void handleOrderCreated(OrderCreatedEvent event) {
log.info("Received OrderCreatedEvent for order: {}", event.getOrderId());
try {
// Update read models, send notifications, etc.
notificationService.sendOrderConfirmation(event.getOrderId(), event.getCustomerId());
} catch (Exception e) {
log.error("Failed to process OrderCreatedEvent for order: {}", event.getOrderId(), e);
throw new AmqpRejectAndDontRequeueException("Processing failed", e);
}
}
@RabbitListener(queues = RabbitMQConfig.ORDER_CONFIRMED_QUEUE)
public void handleOrderConfirmed(OrderConfirmedEvent event) {
log.info("Received OrderConfirmedEvent for order: {}", event.getOrderId());
// Update analytics, send shipping notifications, etc.
notificationService.sendShippingNotification(event.getOrderId());
}
@RabbitListener(queues = RabbitMQConfig.ORDER_CANCELLED_QUEUE)
public void handleOrderCancelled(OrderCancelledEvent event) {
log.info("Received OrderCancelledEvent for order: {}", event.getOrderId());
// Handle cancellation in dependent services
notificationService.sendCancellationNotification(event.getOrderId(), event.getReason());
}
}
Resilience and Monitoring
1. Circuit Breaker and Retry Configuration
@Configuration
public class ResilienceConfig {
@Bean
public Customizer<Resilience4JCircuitBreakerFactory> circuitBreakerFactoryCustomizer() {
return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
.circuitBreakerConfig(CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.waitDurationInOpenState(Duration.ofSeconds(30))
.slidingWindowSize(10)
.build())
.timeLimiterConfig(TimeLimiterConfig.custom()
.timeoutDuration(Duration.ofSeconds(5))
.build())
.build());
}
@Bean
public RetryTemplate retryTemplate() {
return RetryTemplate.builder()
.maxAttempts(3)
.exponentialBackoff(1000, 2, 5000)
.retryOn(PaymentException.class)
.build();
}
}
@Service
@Slf4j
public class ResilientPaymentService {
private final PaymentService paymentService;
private final RetryTemplate retryTemplate;
private final CircuitBreaker circuitBreaker;
public ResilientPaymentService(PaymentService paymentService,
RetryTemplate retryTemplate,
CircuitBreakerRegistry circuitBreakerRegistry) {
this.paymentService = paymentService;
this.retryTemplate = retryTemplate;
this.circuitBreaker = circuitBreakerRegistry.circuitBreaker("paymentService");
}
public Payment processPaymentWithRetry(String orderId, String customerId, BigDecimal amount) {
return circuitBreaker.executeSupplier(() ->
retryTemplate.execute(context -> {
log.info("Attempting payment processing for order: {}, attempt: {}",
orderId, context.getRetryCount() + 1);
return paymentService.processPayment(orderId, customerId, amount);
})
);
}
}
2. Consistency Monitoring
@Component
@Slf4j
public class ConsistencyMonitor {
private final OrderRepository orderRepository;
private final PaymentRepository paymentRepository;
private final InventoryRepository inventoryRepository;
public ConsistencyMonitor(OrderRepository orderRepository,
PaymentRepository paymentRepository,
InventoryRepository inventoryRepository) {
this.orderRepository = orderRepository;
this.paymentRepository = paymentRepository;
this.inventoryRepository = inventoryRepository;
}
@Scheduled(fixedRate = 60000) // Check every minute
public void checkDataConsistency() {
log.info("Starting data consistency check");
checkOrderPaymentConsistency();
checkInventoryConsistency();
log.info("Completed data consistency check");
}
private void checkOrderPaymentConsistency() {
// Find orders that are confirmed but have no successful payment
List<Order> confirmedOrders = orderRepository.findByStatus(OrderStatus.CONFIRMED);
for (Order order : confirmedOrders) {
Optional<Payment> payment = paymentRepository.findByOrderId(order.getOrderId());
if (payment.isEmpty() || payment.get().getStatus() != PaymentStatus.COMPLETED) {
log.warn("Inconsistency detected: Order {} is confirmed but has no completed payment",
order.getOrderId());
// Trigger compensation or alert
}
}
}
private void checkInventoryConsistency() {
// Verify that reserved quantities don't exceed available
List<Inventory> allInventory = inventoryRepository.findAll();
for (Inventory inventory : allInventory) {
if (inventory.getReservedQuantity() > inventory.getAvailableQuantity() + inventory.getReservedQuantity()) {
log.error("Critical inventory inconsistency detected for product: {}",
inventory.getProductId());
// Trigger emergency correction
}
}
}
}
Testing Eventual Consistency
1. Integration Tests
@SpringBootTest
@Testcontainers
@ActiveProfiles("test")
class EventualConsistencyTest {
@Container
static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:15");
@Container
static RabbitMQContainer rabbitMQ = new RabbitMQContainer("rabbitmq:3.12-management");
@Autowired
private OrderSagaOrchestrator sagaOrchestrator;
@Autowired
private OrderRepository orderRepository;
@Autowired
private PaymentRepository paymentRepository;
@Autowired
private InventoryRepository inventoryRepository;
@DynamicPropertySource
static void configureProperties(DynamicPropertyRegistry registry) {
registry.add("spring.datasource.url", postgres::getJdbcUrl);
registry.add("spring.datasource.username", postgres::getUsername);
registry.add("spring.datasource.password", postgres::getPassword);
registry.add("spring.rabbitmq.host", rabbitMQ::getHost);
registry.add("spring.rabbitmq.port", rabbitMQ::getAmqpPort);
}
@Test
void testSuccessfulOrderSaga() throws InterruptedException {
// Given
CreateOrderCommand command = createTestOrderCommand();
// When
sagaOrchestrator.createOrderSaga(command);
// Then - wait for eventual consistency
await().atMost(10, TimeUnit.SECONDS).until(() ->
orderRepository.findById(command.getOrderId())
.map(Order::getStatus)
.orElse(null) == OrderStatus.CONFIRMED
);
Optional<Order> order = orderRepository.findById(command.getOrderId());
assertTrue(order.isPresent());
assertEquals(OrderStatus.CONFIRMED, order.get().getStatus());
Optional<Payment> payment = paymentRepository.findByOrderId(command.getOrderId());
assertTrue(payment.isPresent());
assertEquals(PaymentStatus.COMPLETED, payment.get().getStatus());
}
@Test
void testOrderSagaWithPaymentFailure() throws InterruptedException {
// Given
CreateOrderCommand command = createTestOrderCommand();
// Mock payment service to throw exception
// When
sagaOrchestrator.createOrderSaga(command);
// Then - verify compensation
await().atMost(10, TimeUnit.SECONDS).until(() ->
orderRepository.findById(command.getOrderId())
.map(Order::getStatus)
.orElse(null) == OrderStatus.CANCELLED
);
Optional<Order> order = orderRepository.findById(command.getOrderId());
assertTrue(order.isPresent());
assertEquals(OrderStatus.CANCELLED, order.get().getStatus());
// Verify inventory was released
// Verify payment was refunded if it was processed
}
private CreateOrderCommand createTestOrderCommand() {
return CreateOrderCommand.builder()
.orderId(UUID.randomUUID().toString())
.customerId("customer-123")
.items(List.of(
OrderItem.builder()
.productId("product-1")
.productName("Test Product")
.quantity(2)
.unitPrice(new BigDecimal("29.99"))
.build()
))
.totalAmount(new BigDecimal("59.98"))
.paymentMethod("CREDIT_CARD")
.build();
}
}
Best Practices and Monitoring
1. Application Configuration
# application.yml spring: datasource: url: jdbc:postgresql://localhost:5432/microservices username: postgres password: password jpa: hibernate: ddl-auto: validate properties: hibernate: dialect: org.hibernate.dialect.PostgreSQLDialect jdbc.batch_size: 50 order_inserts: true order_updates: true rabbitmq: host: localhost port: 5672 username: guest password: guest listener: simple: retry: enabled: true max-attempts: 3 initial-interval: 1000ms cloud: stream: bindings: orderCreated-out-0: destination: order.events.exchange group: order-service orderConfirmed-out-0: destination: order.events.exchange group: order-service resilience4j: circuitbreaker: instances: paymentService: failure-rate-threshold: 50 wait-duration-in-open-state: 30s sliding-window-size: 10 retry: instances: paymentRetry: max-attempts: 3 wait-duration: 1s management: endpoints: web: exposure: include: health,metrics,circuitbreakers endpoint: health: show-details: always
2. Monitoring and Observability
@Component
@Slf4j
public class ConsistencyMetrics {
private final MeterRegistry meterRegistry;
private final Counter successfulSagas;
private final Counter failedSagas;
private final Timer sagaExecutionTimer;
public ConsistencyMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.successfulSagas = meterRegistry.counter("saga.completed", "outcome", "success");
this.failedSagas = meterRegistry.counter("saga.completed", "outcome", "failure");
this.sagaExecutionTimer = meterRegistry.timer("saga.execution.time");
}
public void recordSagaSuccess(String sagaType, Duration duration) {
successfulSagas.increment();
sagaExecutionTimer.record(duration);
log.info("Saga {} completed successfully in {} ms", sagaType, duration.toMillis());
}
public void recordSagaFailure(String sagaType, Duration duration, String reason) {
failedSagas.increment();
sagaExecutionTimer.record(duration);
log.warn("Saga {} failed after {} ms: {}", sagaType, duration.toMillis(), reason);
}
public void recordEventPublished(String eventType) {
meterRegistry.counter("event.published", "type", eventType).increment();
}
public void recordEventProcessed(String eventType) {
meterRegistry.counter("event.processed", "type", eventType).increment();
}
}
Conclusion
Eventual consistency in microservices requires careful design and implementation. Key takeaways:
Pattern Selection:
- Saga Pattern: Best for complex business transactions requiring compensation
- Outbox Pattern: Ensures reliable event publishing
- Event Sourcing: Maintains complete audit trail and enables temporal queries
- CQRS: Separates read and write concerns for scalability
Implementation Considerations:
- Idempotency: Design operations to be safely retryable
- Compensation: Implement rollback mechanisms for failed operations
- Monitoring: Track consistency metrics and detect anomalies
- Testing: Verify behavior under various failure scenarios
Best Practices:
- Start simple: Begin with orchestrated sagas for complex workflows
- Embrace asynchronous communication: Use message brokers for loose coupling
- Implement retry and circuit breakers: Handle transient failures gracefully
- Monitor data consistency: Regularly check for inconsistencies
- Design for failure: Assume any service call can fail
Eventual consistency enables microservices to scale independently while maintaining data integrity across service boundaries. By combining these patterns with proper monitoring and testing, you can build robust, scalable systems that gracefully handle the complexities of distributed data management.