Overview
Event choreography is a decentralized approach to microservices coordination where services communicate through events without a central orchestrator. Each service reacts to events and emits new events, creating a workflow through event flow.
Architecture
Core Concepts
- Event-Driven Architecture: Services communicate via events
- Loose Coupling: Services don't know about each other
- Event Sourcing: State changes are stored as events
- CQRS: Separate read and write models
Dependencies
<dependencies> <!-- Spring Boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Message Broker --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <!-- RabbitMQ --> </dependency> <!-- OR --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- Event Sourcing --> <dependency> <groupId>org.axonframework</groupId> <artifactId>axon-spring-boot-starter</artifactId> <version>4.8.0</version> </dependency> <!-- JSON Processing --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <!-- Testing --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
Core Implementation
1. Event Base Classes
// Base Event Interface
public interface DomainEvent {
String getEventId();
String getAggregateId();
Instant getTimestamp();
String getEventType();
}
// Abstract Base Event
public abstract class BaseEvent implements DomainEvent {
private final String eventId;
private final String aggregateId;
private final Instant timestamp;
private final String eventType;
protected BaseEvent(String aggregateId) {
this.eventId = UUID.randomUUID().toString();
this.aggregateId = aggregateId;
this.timestamp = Instant.now();
this.eventType = this.getClass().getSimpleName();
}
@Override public String getEventId() { return eventId; }
@Override public String getAggregateId() { return aggregateId; }
@Override public Instant getTimestamp() { return timestamp; }
@Override public String getEventType() { return eventType; }
}
// Event Wrapper for Messaging
public class EventEnvelope {
private final String eventId;
private final String eventType;
private final String aggregateId;
private final Instant timestamp;
private final String payload;
private final Map<String, String> metadata;
public EventEnvelope(DomainEvent event) {
this.eventId = event.getEventId();
this.eventType = event.getEventType();
this.aggregateId = event.getAggregateId();
this.timestamp = event.getTimestamp();
this.metadata = new HashMap<>();
try {
ObjectMapper mapper = new ObjectMapper();
this.payload = mapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
throw new EventSerializationException("Failed to serialize event", e);
}
}
// Getters
public String getEventId() { return eventId; }
public String getEventType() { return eventType; }
public String getAggregateId() { return aggregateId; }
public Instant getTimestamp() { return timestamp; }
public String getPayload() { return payload; }
public Map<String, String> getMetadata() { return metadata; }
public void addMetadata(String key, String value) {
metadata.put(key, value);
}
public <T extends DomainEvent> T deserialize(Class<T> eventType) {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(payload, eventType);
} catch (JsonProcessingException e) {
throw new EventDeserializationException("Failed to deserialize event", e);
}
}
}
2. Event Bus and Dispatcher
@Component
@Slf4j
public class EventDispatcher {
private final ApplicationEventPublisher applicationEventPublisher;
private final EventStoreRepository eventStoreRepository;
private final MessageBrokerService messageBrokerService;
private final Map<String, List<EventHandler>> eventHandlers;
public EventDispatcher(ApplicationEventPublisher applicationEventPublisher,
EventStoreRepository eventStoreRepository,
MessageBrokerService messageBrokerService) {
this.applicationEventPublisher = applicationEventPublisher;
this.eventStoreRepository = eventStoreRepository;
this.messageBrokerService = messageBrokerService;
this.eventHandlers = new ConcurrentHashMap<>();
}
@EventListener
@Async
public void handleDomainEvent(DomainEvent event) {
log.info("Dispatching event: {} for aggregate: {}",
event.getEventType(), event.getAggregateId());
try {
// 1. Store event for event sourcing
eventStoreRepository.save(event);
// 2. Publish locally for in-process handlers
applicationEventPublisher.publishEvent(new LocalEvent(event));
// 3. Publish to message broker for other services
messageBrokerService.publishEvent(event);
log.info("Event dispatched successfully: {}", event.getEventId());
} catch (Exception e) {
log.error("Failed to dispatch event: {}", event.getEventId(), e);
throw new EventDispatchException("Event dispatch failed", e);
}
}
public void registerHandler(String eventType, EventHandler handler) {
eventHandlers.computeIfAbsent(eventType, k -> new ArrayList<>()).add(handler);
log.info("Registered handler for event type: {}", eventType);
}
public void dispatchToHandlers(DomainEvent event) {
List<EventHandler> handlers = eventHandlers.get(event.getEventType());
if (handlers != null) {
handlers.forEach(handler -> {
try {
handler.handle(event);
} catch (Exception e) {
log.error("Handler failed for event: {}", event.getEventId(), e);
}
});
}
}
}
// Local event for in-process handling
public class LocalEvent {
private final DomainEvent domainEvent;
public LocalEvent(DomainEvent domainEvent) {
this.domainEvent = domainEvent;
}
public DomainEvent getDomainEvent() { return domainEvent; }
}
// Event Handler Interface
@FunctionalInterface
public interface EventHandler {
void handle(DomainEvent event);
}
3. Message Broker Integration
@Component
@Slf4j
public class MessageBrokerService {
private final RabbitTemplate rabbitTemplate;
private final KafkaTemplate<String, EventEnvelope> kafkaTemplate;
private final String exchangeName;
private final String topicName;
public MessageBrokerService(RabbitTemplate rabbitTemplate,
KafkaTemplate<String, EventEnvelope> kafkaTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.kafkaTemplate = kafkaTemplate;
this.exchangeName = "domain-events";
this.topicName = "domain-events";
}
public void publishEvent(DomainEvent event) {
EventEnvelope envelope = new EventEnvelope(event);
// Add metadata
envelope.addMetadata("source", "order-service");
envelope.addMetadata("version", "1.0");
envelope.addMetadata("correlationId", getCorrelationId());
// Publish to RabbitMQ
publishToRabbitMQ(envelope);
// Publish to Kafka
publishToKafka(envelope);
}
private void publishToRabbitMQ(EventEnvelope envelope) {
try {
rabbitTemplate.convertAndSend(
exchangeName,
getRoutingKey(envelope),
envelope,
message -> {
message.getMessageProperties().setHeader("eventType", envelope.getEventType());
message.getMessageProperties().setHeader("timestamp", envelope.getTimestamp().toString());
return message;
}
);
log.debug("Event published to RabbitMQ: {}", envelope.getEventId());
} catch (Exception e) {
log.error("Failed to publish event to RabbitMQ: {}", envelope.getEventId(), e);
}
}
private void publishToKafka(EventEnvelope envelope) {
try {
kafkaTemplate.send(
topicName,
envelope.getAggregateId(),
envelope
);
log.debug("Event published to Kafka: {}", envelope.getEventId());
} catch (Exception e) {
log.error("Failed to publish event to Kafka: {}", envelope.getEventId(), e);
}
}
private String getRoutingKey(EventEnvelope envelope) {
return String.format("event.%s.%s",
envelope.getEventType().toLowerCase(),
envelope.getAggregateId());
}
private String getCorrelationId() {
// Get from MDC or generate new
return UUID.randomUUID().toString();
}
}
// Event Consumer
@Component
@Slf4j
public class EventConsumer {
private final EventDispatcher eventDispatcher;
private final ObjectMapper objectMapper;
private final Map<String, Class<? extends DomainEvent>> eventRegistry;
public EventConsumer(EventDispatcher eventDispatcher, ObjectMapper objectMapper) {
this.eventDispatcher = eventDispatcher;
this.objectMapper = objectMapper;
this.eventRegistry = new ConcurrentHashMap<>();
registerEventTypes();
}
@RabbitListener(queues = "${app.event-queue:domain-events}")
public void handleRabbitEvent(EventEnvelope envelope) {
processEvent(envelope);
}
@KafkaListener(topics = "${app.event-topic:domain-events}")
public void handleKafkaEvent(EventEnvelope envelope) {
processEvent(envelope);
}
private void processEvent(EventEnvelope envelope) {
try {
log.info("Received event: {} for aggregate: {}",
envelope.getEventType(), envelope.getAggregateId());
Class<? extends DomainEvent> eventClass = eventRegistry.get(envelope.getEventType());
if (eventClass != null) {
DomainEvent event = envelope.deserialize(eventClass);
eventDispatcher.dispatchToHandlers(event);
} else {
log.warn("Unknown event type: {}", envelope.getEventType());
}
} catch (Exception e) {
log.error("Failed to process event: {}", envelope.getEventId(), e);
}
}
private void registerEventTypes() {
// Register all domain event types
eventRegistry.put("OrderCreated", OrderCreated.class);
eventRegistry.put("OrderCancelled", OrderCancelled.class);
eventRegistry.put("PaymentProcessed", PaymentProcessed.class);
eventRegistry.put("InventoryReserved", InventoryReserved.class);
eventRegistry.put("OrderShipped", OrderShipped.class);
}
}
Domain-Specific Implementation
1. Order Service Events
// Order Domain Events
public class OrderCreated extends BaseEvent {
private final String customerId;
private final List<OrderItem> items;
private final BigDecimal totalAmount;
private final String status;
public OrderCreated(String orderId, String customerId,
List<OrderItem> items, BigDecimal totalAmount) {
super(orderId);
this.customerId = customerId;
this.items = List.copyOf(items);
this.totalAmount = totalAmount;
this.status = "CREATED";
}
// Getters
public String getCustomerId() { return customerId; }
public List<OrderItem> getItems() { return items; }
public BigDecimal getTotalAmount() { return totalAmount; }
public String getStatus() { return status; }
}
public class OrderCancelled extends BaseEvent {
private final String reason;
public OrderCancelled(String orderId, String reason) {
super(orderId);
this.reason = reason;
}
public String getReason() { return reason; }
}
public class OrderShipped extends BaseEvent {
private final String trackingNumber;
private final String carrier;
private final Instant shippedAt;
public OrderShipped(String orderId, String trackingNumber, String carrier) {
super(orderId);
this.trackingNumber = trackingNumber;
this.carrier = carrier;
this.shippedAt = Instant.now();
}
// Getters
public String getTrackingNumber() { return trackingNumber; }
public String getCarrier() { return carrier; }
public Instant getShippedAt() { return shippedAt; }
}
// Order Aggregate
@Component
@Slf4j
public class OrderAggregate {
private final EventDispatcher eventDispatcher;
private final OrderRepository orderRepository;
public OrderAggregate(EventDispatcher eventDispatcher,
OrderRepository orderRepository) {
this.eventDispatcher = eventDispatcher;
this.orderRepository = orderRepository;
}
public String createOrder(CreateOrderCommand command) {
String orderId = UUID.randomUUID().toString();
// Validate business rules
validateOrder(command);
// Create order entity
Order order = new Order(orderId, command.getCustomerId(),
command.getItems(), command.getTotalAmount());
orderRepository.save(order);
// Emit event
OrderCreated event = new OrderCreated(orderId, command.getCustomerId(),
command.getItems(), command.getTotalAmount());
eventDispatcher.handleDomainEvent(event);
return orderId;
}
public void cancelOrder(CancelOrderCommand command) {
Order order = orderRepository.findById(command.getOrderId())
.orElseThrow(() -> new OrderNotFoundException(command.getOrderId()));
if (!order.canBeCancelled()) {
throw new OrderCancellationException("Order cannot be cancelled");
}
order.cancel(command.getReason());
orderRepository.save(order);
OrderCancelled event = new OrderCancelled(command.getOrderId(), command.getReason());
eventDispatcher.handleDomainEvent(event);
}
public void shipOrder(ShipOrderCommand command) {
Order order = orderRepository.findById(command.getOrderId())
.orElseThrow(() -> new OrderNotFoundException(command.getOrderId()));
order.ship(command.getTrackingNumber(), command.getCarrier());
orderRepository.save(order);
OrderShipped event = new OrderShipped(command.getOrderId(),
command.getTrackingNumber(),
command.getCarrier());
eventDispatcher.handleDomainEvent(event);
}
private void validateOrder(CreateOrderCommand command) {
if (command.getItems().isEmpty()) {
throw new InvalidOrderException("Order must have at least one item");
}
if (command.getTotalAmount().compareTo(BigDecimal.ZERO) <= 0) {
throw new InvalidOrderException("Order total must be positive");
}
}
}
2. Payment Service Events
// Payment Domain Events
public class PaymentProcessed extends BaseEvent {
private final String orderId;
private final BigDecimal amount;
private final String paymentMethod;
private final String transactionId;
private final String status;
public PaymentProcessed(String paymentId, String orderId, BigDecimal amount,
String paymentMethod, String transactionId) {
super(paymentId);
this.orderId = orderId;
this.amount = amount;
this.paymentMethod = paymentMethod;
this.transactionId = transactionId;
this.status = "PROCESSED";
}
// Getters
public String getOrderId() { return orderId; }
public BigDecimal getAmount() { return amount; }
public String getPaymentMethod() { return paymentMethod; }
public String getTransactionId() { return transactionId; }
public String getStatus() { return status; }
}
public class PaymentFailed extends BaseEvent {
private final String orderId;
private final String reason;
public PaymentFailed(String paymentId, String orderId, String reason) {
super(paymentId);
this.orderId = orderId;
this.reason = reason;
}
public String getOrderId() { return orderId; }
public String getReason() { return reason; }
}
// Payment Service Event Handlers
@Component
@Slf4j
public class PaymentEventHandlers {
private final PaymentService paymentService;
public PaymentEventHandlers(PaymentService paymentService) {
this.paymentService = paymentService;
}
@EventListener
public void handleOrderCreated(OrderCreated event) {
log.info("Processing payment for order: {}", event.getAggregateId());
try {
// Process payment
ProcessPaymentCommand command = new ProcessPaymentCommand(
event.getAggregateId(),
event.getTotalAmount(),
"CREDIT_CARD" // Default payment method
);
PaymentResult result = paymentService.processPayment(command);
if (result.isSuccess()) {
PaymentProcessed paymentEvent = new PaymentProcessed(
result.getPaymentId(),
event.getAggregateId(),
event.getTotalAmount(),
command.getPaymentMethod(),
result.getTransactionId()
);
// Emit payment processed event
// This would typically be done through the payment service's event dispatcher
} else {
PaymentFailed paymentEvent = new PaymentFailed(
result.getPaymentId(),
event.getAggregateId(),
result.getFailureReason()
);
// Emit payment failed event
}
} catch (Exception e) {
log.error("Failed to process payment for order: {}", event.getAggregateId(), e);
}
}
@EventListener
public void handleOrderCancelled(OrderCancelled event) {
log.info("Processing refund for cancelled order: {}", event.getAggregateId());
try {
paymentService.refundPayment(event.getAggregateId(), event.getReason());
} catch (Exception e) {
log.error("Failed to process refund for order: {}", event.getAggregateId(), e);
}
}
}
3. Inventory Service Events
// Inventory Domain Events
public class InventoryReserved extends BaseEvent {
private final String orderId;
private final Map<String, Integer> reservedItems;
public InventoryReserved(String reservationId, String orderId,
Map<String, Integer> reservedItems) {
super(reservationId);
this.orderId = orderId;
this.reservedItems = Map.copyOf(reservedItems);
}
public String getOrderId() { return orderId; }
public Map<String, Integer> getReservedItems() { return reservedItems; }
}
public class InventoryOutOfStock extends BaseEvent {
private final String orderId;
private final String productId;
private final int requestedQuantity;
private final int availableQuantity;
public InventoryOutOfStock(String eventId, String orderId, String productId,
int requestedQuantity, int availableQuantity) {
super(eventId);
this.orderId = orderId;
this.productId = productId;
this.requestedQuantity = requestedQuantity;
this.availableQuantity = availableQuantity;
}
// Getters
public String getOrderId() { return orderId; }
public String getProductId() { return productId; }
public int getRequestedQuantity() { return requestedQuantity; }
public int getAvailableQuantity() { return availableQuantity; }
}
// Inventory Service Event Handlers
@Component
@Slf4j
public class InventoryEventHandlers {
private final InventoryService inventoryService;
public InventoryEventHandlers(InventoryService inventoryService) {
this.inventoryService = inventoryService;
}
@EventListener
public void handleOrderCreated(OrderCreated event) {
log.info("Reserving inventory for order: {}", event.getAggregateId());
try {
Map<String, Integer> itemsToReserve = event.getItems().stream()
.collect(Collectors.toMap(
OrderItem::getProductId,
OrderItem::getQuantity
));
ReserveInventoryCommand command = new ReserveInventoryCommand(
event.getAggregateId(),
itemsToReserve
);
InventoryReservationResult result = inventoryService.reserveInventory(command);
if (result.isSuccess()) {
InventoryReserved inventoryEvent = new InventoryReserved(
result.getReservationId(),
event.getAggregateId(),
itemsToReserve
);
// Emit inventory reserved event
} else {
InventoryOutOfStock outOfStockEvent = new InventoryOutOfStock(
UUID.randomUUID().toString(),
event.getAggregateId(),
result.getOutOfStockProductId(),
result.getRequestedQuantity(),
result.getAvailableQuantity()
);
// Emit out of stock event
}
} catch (Exception e) {
log.error("Failed to reserve inventory for order: {}", event.getAggregateId(), e);
}
}
@EventListener
public void handleOrderCancelled(OrderCancelled event) {
log.info("Releasing inventory for cancelled order: {}", event.getAggregateId());
try {
inventoryService.releaseInventory(event.getAggregateId());
} catch (Exception e) {
log.error("Failed to release inventory for order: {}", event.getAggregateId(), e);
}
}
@EventListener
public void handleOrderShipped(OrderShipped event) {
log.info("Updating inventory for shipped order: {}", event.getAggregateId());
try {
inventoryService.updateInventoryForShippedOrder(event.getAggregateId());
} catch (Exception e) {
log.error("Failed to update inventory for shipped order: {}", event.getAggregateId(), e);
}
}
}
Workflow Choreography
1. Order Fulfillment Workflow
@Component
@Slf4j
public class OrderFulfillmentWorkflow {
private final OrderService orderService;
private final EventTracker eventTracker;
public OrderFulfillmentWorkflow(OrderService orderService,
EventTracker eventTracker) {
this.orderService = orderService;
this.eventTracker = eventTracker;
}
@EventListener
public void startOrderFulfillment(OrderCreated event) {
log.info("Starting order fulfillment workflow for order: {}", event.getAggregateId());
// Track that order creation has started the workflow
eventTracker.trackEvent(event.getAggregateId(), "ORDER_FULFILLMENT_STARTED", event);
}
@EventListener
public void handlePaymentProcessed(PaymentProcessed event) {
log.info("Payment processed for order: {}", event.getOrderId());
eventTracker.trackEvent(event.getOrderId(), "PAYMENT_PROCESSED", event);
// Check if we can proceed to inventory reservation
checkOrderProgress(event.getOrderId());
}
@EventListener
public void handleInventoryReserved(InventoryReserved event) {
log.info("Inventory reserved for order: {}", event.getOrderId());
eventTracker.trackEvent(event.getOrderId(), "INVENTORY_RESERVED", event);
// Check if we can proceed to shipping
checkOrderProgress(event.getOrderId());
}
@EventListener
public void handleOrderShipped(OrderShipped event) {
log.info("Order shipped: {}", event.getAggregateId());
eventTracker.trackEvent(event.getAggregateId(), "ORDER_SHIPPED", event);
// Mark fulfillment as complete
completeOrderFulfillment(event.getAggregateId());
}
@EventListener
public void handlePaymentFailed(PaymentFailed event) {
log.warn("Payment failed for order: {}, reason: {}",
event.getOrderId(), event.getReason());
eventTracker.trackEvent(event.getOrderId(), "PAYMENT_FAILED", event);
// Cancel the order due to payment failure
orderService.cancelOrder(new CancelOrderCommand(event.getOrderId(), event.getReason()));
}
@EventListener
public void handleInventoryOutOfStock(InventoryOutOfStock event) {
log.warn("Inventory out of stock for order: {}, product: {}",
event.getOrderId(), event.getProductId());
eventTracker.trackEvent(event.getOrderId(), "INVENTORY_OUT_OF_STOCK", event);
// Cancel the order due to inventory issues
orderService.cancelOrder(new CancelOrderCommand(
event.getOrderId(),
"Product out of stock: " + event.getProductId()
));
}
private void checkOrderProgress(String orderId) {
Set<String> completedEvents = eventTracker.getCompletedEvents(orderId);
// If both payment and inventory are done, ship the order
if (completedEvents.contains("PAYMENT_PROCESSED") &&
completedEvents.contains("INVENTORY_RESERVED")) {
log.info("All prerequisites met, shipping order: {}", orderId);
orderService.shipOrder(new ShipOrderCommand(orderId, "UPS", generateTrackingNumber()));
}
}
private void completeOrderFulfillment(String orderId) {
eventTracker.trackEvent(orderId, "ORDER_FULFILLMENT_COMPLETED", null);
log.info("Order fulfillment completed for order: {}", orderId);
}
private String generateTrackingNumber() {
return "TRK" + UUID.randomUUID().toString().substring(0, 8).toUpperCase();
}
}
2. Event Tracker for Workflow State
@Component
@Slf4j
public class EventTracker {
private final EventTrackerRepository repository;
public EventTracker(EventTrackerRepository repository) {
this.repository = repository;
}
public void trackEvent(String aggregateId, String eventType, DomainEvent event) {
WorkflowState state = repository.findByAggregateId(aggregateId)
.orElse(new WorkflowState(aggregateId));
state.addEvent(eventType, event != null ? event.getTimestamp() : Instant.now());
repository.save(state);
log.debug("Tracked event: {} for aggregate: {}", eventType, aggregateId);
}
public Set<String> getCompletedEvents(String aggregateId) {
return repository.findByAggregateId(aggregateId)
.map(WorkflowState::getCompletedEvents)
.orElse(Collections.emptySet());
}
public boolean isWorkflowComplete(String aggregateId) {
return repository.findByAggregateId(aggregateId)
.map(WorkflowState::isComplete)
.orElse(false);
}
public WorkflowStatus getWorkflowStatus(String aggregateId) {
return repository.findByAggregateId(aggregateId)
.map(WorkflowState::getStatus)
.orElse(WorkflowStatus.NOT_STARTED);
}
}
@Entity
@Table(name = "workflow_states")
@Data
@NoArgsConstructor
public class WorkflowState {
@Id
private String id;
private String aggregateId;
@ElementCollection
@CollectionTable(name = "workflow_events", joinColumns = @JoinColumn(name = "workflow_id"))
private Set<String> completedEvents = new HashSet<>();
private Instant startedAt;
private Instant completedAt;
private WorkflowStatus status = WorkflowStatus.IN_PROGRESS;
public WorkflowState(String aggregateId) {
this.id = UUID.randomUUID().toString();
this.aggregateId = aggregateId;
this.startedAt = Instant.now();
}
public void addEvent(String eventType, Instant timestamp) {
completedEvents.add(eventType);
if (eventType.equals("ORDER_FULFILLMENT_COMPLETED")) {
this.status = WorkflowStatus.COMPLETED;
this.completedAt = timestamp;
} else if (eventType.equals("ORDER_CANCELLED")) {
this.status = WorkflowStatus.CANCELLED;
this.completedAt = timestamp;
}
}
public boolean isComplete() {
return status == WorkflowStatus.COMPLETED || status == WorkflowStatus.CANCELLED;
}
}
public enum WorkflowStatus {
NOT_STARTED,
IN_PROGRESS,
COMPLETED,
CANCELLED,
FAILED
}
Monitoring and Observability
1. Event Monitoring
@Component
@Slf4j
public class EventMonitor {
private final MeterRegistry meterRegistry;
private final Map<String, Counter> eventCounters;
private final Map<String, Timer> eventTimers;
public EventMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.eventCounters = new ConcurrentHashMap<>();
this.eventTimers = new ConcurrentHashMap<>();
}
@EventListener
public void monitorEvent(DomainEvent event) {
String eventType = event.getEventType();
// Count events
Counter counter = eventCounters.computeIfAbsent(eventType,
type -> meterRegistry.counter("domain.events", "type", type));
counter.increment();
// Record event age if it's a processed event
if (event instanceof LocalEvent) {
DomainEvent domainEvent = ((LocalEvent) event).getDomainEvent();
recordEventAge(domainEvent);
}
}
@EventListener
public void monitorEventProcessing(LocalEvent event) {
Timer timer = eventTimers.computeIfAbsent(event.getDomainEvent().getEventType(),
type -> meterRegistry.timer("event.processing.time", "type", type));
timer.record(() -> {
// The actual processing time is measured by the @Async aspect
});
}
private void recordEventAge(DomainEvent event) {
long age = Duration.between(event.getTimestamp(), Instant.now()).toMillis();
meterRegistry.gauge("event.age",
Tags.of("type", event.getEventType()),
age);
}
public EventStatistics getEventStatistics() {
Map<String, Long> eventCounts = eventCounters.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> (long) entry.getValue().count()
));
return new EventStatistics(eventCounts, Instant.now());
}
}
2. Circuit Breaker for Event Handlers
@Component
@Slf4j
public class ResilientEventHandler {
private final CircuitBreakerRegistry circuitBreakerRegistry;
private final Map<String, CircuitBreaker> circuitBreakers;
public ResilientEventHandler(CircuitBreakerRegistry circuitBreakerRegistry) {
this.circuitBreakerRegistry = circuitBreakerRegistry;
this.circuitBreakers = new ConcurrentHashMap<>();
}
public <T> T executeWithResilience(String handlerName, Supplier<T> operation) {
CircuitBreaker circuitBreaker = circuitBreakers.computeIfAbsent(handlerName,
name -> circuitBreakerRegistry.circuitBreaker(name));
return circuitBreaker.executeSupplier(operation);
}
public void executeWithResilience(String handlerName, Runnable operation) {
CircuitBreaker circuitBreaker = circuitBreakers.computeIfAbsent(handlerName,
name -> circuitBreakerRegistry.circuitBreaker(name));
circuitBreaker.executeRunnable(operation);
}
}
// Usage in event handlers
@Component
@Slf4j
public class ResilientPaymentEventHandler {
private final ResilientEventHandler resilientEventHandler;
private final PaymentService paymentService;
public ResilientPaymentEventHandler(ResilientEventHandler resilientEventHandler,
PaymentService paymentService) {
this.resilientEventHandler = resilientEventHandler;
this.paymentService = paymentService;
}
@EventListener
public void handleOrderCreated(OrderCreated event) {
resilientEventHandler.executeWithResilience("payment-handler", () -> {
ProcessPaymentCommand command = new ProcessPaymentCommand(
event.getAggregateId(),
event.getTotalAmount(),
"CREDIT_CARD"
);
PaymentResult result = paymentService.processPayment(command);
// Handle result...
});
}
}
Testing
1. Event Choreography Tests
@SpringBootTest
@TestPropertySource(properties = {
"spring.main.allow-bean-definition-overriding=true"
})
class EventChoreographyTest {
@Autowired
private OrderAggregate orderAggregate;
@Autowired
private EventDispatcher eventDispatcher;
@Autowired
private EventTracker eventTracker;
@MockBean
private PaymentService paymentService;
@MockBean
private InventoryService inventoryService;
@Test
void testOrderFulfillmentWorkflow() {
// Given
CreateOrderCommand command = new CreateOrderCommand(
"customer-123",
List.of(new OrderItem("product-1", 2, new BigDecimal("50.00"))),
new BigDecimal("100.00")
);
when(paymentService.processPayment(any()))
.thenReturn(new PaymentResult("payment-123", "txn-123", true, null));
when(inventoryService.reserveInventory(any()))
.thenReturn(new InventoryReservationResult("reservation-123", true, null, 0, 0));
// When
String orderId = orderAggregate.createOrder(command);
// Then - Verify events were emitted and processed
await().atMost(5, TimeUnit.SECONDS).until(() ->
eventTracker.isWorkflowComplete(orderId));
assertEquals(WorkflowStatus.COMPLETED, eventTracker.getWorkflowStatus(orderId));
}
@Test
void testOrderCancellationDueToPaymentFailure() {
// Given
CreateOrderCommand command = new CreateOrderCommand(
"customer-123",
List.of(new OrderItem("product-1", 1, new BigDecimal("50.00"))),
new BigDecimal("50.00")
);
when(paymentService.processPayment(any()))
.thenReturn(new PaymentResult("payment-123", null, false, "Insufficient funds"));
// When
String orderId = orderAggregate.createOrder(command);
// Then
await().atMost(5, TimeUnit.SECONDS).until(() ->
eventTracker.getWorkflowStatus(orderId) == WorkflowStatus.CANCELLED);
}
}
Configuration
application.yml
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest kafka: bootstrap-servers: localhost:9092 consumer: group-id: order-service auto-offset-reset: earliest jpa: hibernate: ddl-auto: update show-sql: false app: event-queue: domain-events event-topic: domain-events event-handler: timeout: 30000 max-retries: 3 resilience4j: circuitbreaker: instances: payment-handler: register-health-indicator: true sliding-window-size: 10 failure-rate-threshold: 50 wait-duration-in-open-state: 10s
Best Practices
- Idempotent Handlers: Ensure event handlers can handle duplicate events
- Event Versioning: Plan for event schema evolution
- Error Handling: Implement dead letter queues for failed events
- Monitoring: Track event flow and handler performance
- Testing: Test event choreography workflows thoroughly
- Documentation: Document event contracts between services
- Security: Secure event channels and validate event sources
This implementation provides a comprehensive foundation for event choreography in Java microservices, enabling loose coupling, scalability, and resilient workflows through event-driven communication.