Domain Events Publishing: Building Event-Driven Domain Models in Java

Domain Events are a powerful pattern in Domain-Driven Design (DDD) that capture something meaningful that happened in the domain. They enable loose coupling between domain components and facilitate event-driven architectures.

Core Concepts

What are Domain Events?

  • Domain Significance: Represent something important that happened in the business domain
  • Immutable Facts: Events are immutable - they represent something that already occurred
  • Named in Past Tense: e.g., OrderCreated, PaymentProcessed, UserRegistered
  • Self-Contained: Carry all necessary information about the event

Basic Implementation Patterns

Example 1: Basic Domain Event Structure

// Base interfaces and classes
public interface DomainEvent {
String getEventId();
String getAggregateId();
String getEventType();
LocalDateTime getOccurredAt();
String getEventVersion();
}
public abstract class BaseDomainEvent implements DomainEvent {
private final String eventId;
private final String aggregateId;
private final LocalDateTime occurredAt;
private final String eventVersion;
protected BaseDomainEvent(String aggregateId) {
this.eventId = UUID.randomUUID().toString();
this.aggregateId = aggregateId;
this.occurredAt = LocalDateTime.now();
this.eventVersion = "1.0";
}
@Override public String getEventId() { return eventId; }
@Override public String getAggregateId() { return aggregateId; }
@Override public LocalDateTime getOccurredAt() { return occurredAt; }
@Override public String getEventVersion() { return eventVersion; }
@Override
public String getEventType() {
return this.getClass().getSimpleName();
}
}
// Concrete domain events
public class OrderCreatedEvent extends BaseDomainEvent {
private final String customerId;
private final BigDecimal amount;
private final List<OrderItem> items;
public OrderCreatedEvent(String orderId, String customerId, 
BigDecimal amount, List<OrderItem> items) {
super(orderId);
this.customerId = customerId;
this.amount = amount;
this.items = List.copyOf(items);
}
// Getters
public String getCustomerId() { return customerId; }
public BigDecimal getAmount() { return amount; }
public List<OrderItem> getItems() { return items; }
}
public class OrderCancelledEvent extends BaseDomainEvent {
private final String reason;
private final String cancelledBy;
public OrderCancelledEvent(String orderId, String reason, String cancelledBy) {
super(orderId);
this.reason = reason;
this.cancelledBy = cancelledBy;
}
// Getters
public String getReason() { return reason; }
public String getCancelledBy() { return cancelledBy; }
}
public class PaymentProcessedEvent extends BaseDomainEvent {
private final String orderId;
private final BigDecimal amount;
private final String paymentMethod;
private final String transactionId;
public PaymentProcessedEvent(String paymentId, String orderId, 
BigDecimal amount, String paymentMethod, 
String transactionId) {
super(paymentId);
this.orderId = orderId;
this.amount = amount;
this.paymentMethod = paymentMethod;
this.transactionId = transactionId;
}
// Getters
public String getOrderId() { return orderId; }
public BigDecimal getAmount() { return amount; }
public String getPaymentMethod() { return paymentMethod; }
public String getTransactionId() { return transactionId; }
}

Example 2: Aggregate Root with Domain Events

// Aggregate Root base class
public abstract class AggregateRoot {
private final List<DomainEvent> domainEvents = new ArrayList<>();
private Long version = 0L;
protected void registerEvent(DomainEvent event) {
domainEvents.add(event);
}
public List<DomainEvent> getDomainEvents() {
return new ArrayList<>(domainEvents);
}
public void clearEvents() {
domainEvents.clear();
}
public Long getVersion() { return version; }
protected void incrementVersion() { this.version++; }
// Event application method for event sourcing
protected void apply(DomainEvent event) {
registerEvent(event);
// In event sourcing, this would also update state
}
}
// Order Aggregate
public class Order extends AggregateRoot {
private String id;
private String customerId;
private OrderStatus status;
private BigDecimal amount;
private List<OrderItem> items;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
// Factory method for creation
public static Order create(String customerId, List<OrderItem> items) {
Order order = new Order();
order.id = UUID.randomUUID().toString();
order.customerId = customerId;
order.items = new ArrayList<>(items);
order.amount = calculateTotalAmount(items);
order.status = OrderStatus.CREATED;
order.createdAt = LocalDateTime.now();
order.updatedAt = LocalDateTime.now();
// Register domain event
order.registerEvent(new OrderCreatedEvent(
order.id, customerId, order.amount, items
));
order.incrementVersion();
return order;
}
public void cancel(String reason, String cancelledBy) {
if (this.status != OrderStatus.CREATED && this.status != OrderStatus.PENDING) {
throw new IllegalOrderStateException(
"Cannot cancel order in state: " + this.status);
}
this.status = OrderStatus.CANCELLED;
this.updatedAt = LocalDateTime.now();
registerEvent(new OrderCancelledEvent(this.id, reason, cancelledBy));
incrementVersion();
}
public void processPayment(String paymentId, String paymentMethod, 
String transactionId) {
if (this.status != OrderStatus.CREATED) {
throw new IllegalOrderStateException(
"Cannot process payment for order in state: " + this.status);
}
this.status = OrderStatus.PAYMENT_PROCESSED;
this.updatedAt = LocalDateTime.now();
registerEvent(new PaymentProcessedEvent(
paymentId, this.id, this.amount, paymentMethod, transactionId
));
incrementVersion();
}
public void ship() {
if (this.status != OrderStatus.PAYMENT_PROCESSED) {
throw new IllegalOrderStateException(
"Cannot ship order in state: " + this.status);
}
this.status = OrderStatus.SHIPPED;
this.updatedAt = LocalDateTime.now();
registerEvent(new OrderShippedEvent(this.id, LocalDateTime.now()));
incrementVersion();
}
public void addItem(OrderItem item) {
this.items.add(item);
this.amount = this.amount.add(item.getPrice().multiply(
BigDecimal.valueOf(item.getQuantity())));
this.updatedAt = LocalDateTime.now();
registerEvent(new OrderItemAddedEvent(this.id, item));
incrementVersion();
}
private static BigDecimal calculateTotalAmount(List<OrderItem> items) {
return items.stream()
.map(item -> item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
}
// Getters
public String getId() { return id; }
public String getCustomerId() { return customerId; }
public OrderStatus getStatus() { return status; }
public BigDecimal getAmount() { return amount; }
public List<OrderItem> getItems() { return List.copyOf(items); }
public LocalDateTime getCreatedAt() { return createdAt; }
public LocalDateTime getUpdatedAt() { return updatedAt; }
}
// Supporting classes
public class OrderItem {
private final String productId;
private final String productName;
private final BigDecimal price;
private final int quantity;
public OrderItem(String productId, String productName, 
BigDecimal price, int quantity) {
this.productId = productId;
this.productName = productName;
this.price = price;
this.quantity = quantity;
}
// Getters
public String getProductId() { return productId; }
public String getProductName() { return productName; }
public BigDecimal getPrice() { return price; }
public int getQuantity() { return quantity; }
}
enum OrderStatus {
CREATED, PAYMENT_PROCESSED, SHIPPED, DELIVERED, CANCELLED
}
class IllegalOrderStateException extends RuntimeException {
public IllegalOrderStateException(String message) {
super(message);
}
}

Domain Event Publishing Patterns

Example 3: Domain Event Publisher

// Event Publisher Interface
public interface DomainEventPublisher {
void publish(DomainEvent event);
void publish(Collection<DomainEvent> events);
}
// Spring-based Implementation
@Component
@Slf4j
public class SpringDomainEventPublisher implements DomainEventPublisher {
private final ApplicationEventPublisher applicationEventPublisher;
private final ObjectMapper objectMapper;
public SpringDomainEventPublisher(ApplicationEventPublisher applicationEventPublisher,
ObjectMapper objectMapper) {
this.applicationEventPublisher = applicationEventPublisher;
this.objectMapper = objectMapper;
}
@Override
public void publish(DomainEvent event) {
log.debug("Publishing domain event: {}", event.getEventType());
try {
// Convert to Spring application event
DomainEventApplicationEvent wrapper = 
new DomainEventApplicationEvent(this, event);
applicationEventPublisher.publishEvent(wrapper);
log.info("Successfully published domain event: {} for aggregate: {}", 
event.getEventType(), event.getAggregateId());
} catch (Exception e) {
log.error("Failed to publish domain event: {}", event.getEventType(), e);
throw new EventPublishingException("Failed to publish domain event", e);
}
}
@Override
public void publish(Collection<DomainEvent> events) {
if (events == null || events.isEmpty()) {
return;
}
events.forEach(this::publish);
}
}
// Spring Application Event Wrapper
public class DomainEventApplicationEvent extends ApplicationEvent {
private final DomainEvent domainEvent;
public DomainEventApplicationEvent(Object source, DomainEvent domainEvent) {
super(source);
this.domainEvent = domainEvent;
}
public DomainEvent getDomainEvent() {
return domainEvent;
}
}
// Custom exception
class EventPublishingException extends RuntimeException {
public EventPublishingException(String message, Throwable cause) {
super(message, cause);
}
}

Example 4: Domain Event Listeners

// Event Listeners
@Component
@Slf4j
public class OrderDomainEventListener {
private final EmailService emailService;
private final InventoryService inventoryService;
private final AnalyticsService analyticsService;
public OrderDomainEventListener(EmailService emailService,
InventoryService inventoryService,
AnalyticsService analyticsService) {
this.emailService = emailService;
this.inventoryService = inventoryService;
this.analyticsService = analyticsService;
}
@EventListener
@Async
public void handleOrderCreated(OrderCreatedEvent event) {
log.info("Processing OrderCreated event for order: {}", event.getAggregateId());
try {
// 1. Reserve inventory
reserveInventory(event);
// 2. Send confirmation email
sendOrderConfirmation(event);
// 3. Record analytics
recordOrderAnalytics(event);
log.info("Successfully processed OrderCreated event for order: {}", 
event.getAggregateId());
} catch (Exception e) {
log.error("Failed to process OrderCreated event for order: {}", 
event.getAggregateId(), e);
// In production, you might want to implement retry logic or dead letter queue
}
}
@EventListener
@Async
public void handleOrderCancelled(OrderCancelledEvent event) {
log.info("Processing OrderCancelled event for order: {}", event.getAggregateId());
try {
// 1. Release reserved inventory
releaseInventory(event);
// 2. Send cancellation email
sendCancellationNotification(event);
// 3. Record cancellation analytics
recordCancellationAnalytics(event);
} catch (Exception e) {
log.error("Failed to process OrderCancelled event for order: {}", 
event.getAggregateId(), e);
}
}
@EventListener
@Async
public void handlePaymentProcessed(PaymentProcessedEvent event) {
log.info("Processing PaymentProcessed event for order: {}", event.getOrderId());
try {
// Update order status or trigger next steps
emailService.sendPaymentConfirmation(event.getOrderId(), event.getTransactionId());
analyticsService.recordPayment(event);
} catch (Exception e) {
log.error("Failed to process PaymentProcessed event for order: {}", 
event.getOrderId(), e);
}
}
@EventListener
@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleOrderShipped(OrderShippedEvent event) {
log.info("Processing OrderShipped event for order: {}", event.getAggregateId());
// This will only be called if the transaction commits successfully
emailService.sendShippingNotification(event.getAggregateId());
}
private void reserveInventory(OrderCreatedEvent event) {
event.getItems().forEach(item -> {
inventoryService.reserveItem(
item.getProductId(), 
item.getQuantity(), 
event.getAggregateId()
);
});
}
private void releaseInventory(OrderCancelledEvent event) {
inventoryService.releaseReservation(event.getAggregateId());
}
private void sendOrderConfirmation(OrderCreatedEvent event) {
emailService.sendOrderConfirmation(
event.getAggregateId(),
event.getCustomerId(),
event.getAmount()
);
}
private void sendCancellationNotification(OrderCancelledEvent event) {
emailService.sendCancellationNotification(
event.getAggregateId(),
event.getReason()
);
}
private void recordOrderAnalytics(OrderCreatedEvent event) {
analyticsService.recordOrderCreated(
event.getAggregateId(),
event.getCustomerId(),
event.getAmount(),
event.getItems().size()
);
}
private void recordCancellationAnalytics(OrderCancelledEvent event) {
analyticsService.recordOrderCancelled(
event.getAggregateId(),
event.getReason()
);
}
}
// Supporting services (interfaces)
public interface EmailService {
void sendOrderConfirmation(String orderId, String customerId, BigDecimal amount);
void sendCancellationNotification(String orderId, String reason);
void sendPaymentConfirmation(String orderId, String transactionId);
void sendShippingNotification(String orderId);
}
public interface InventoryService {
void reserveItem(String productId, int quantity, String reservationId);
void releaseReservation(String reservationId);
}
public interface AnalyticsService {
void recordOrderCreated(String orderId, String customerId, BigDecimal amount, int itemCount);
void recordOrderCancelled(String orderId, String reason);
void recordPayment(PaymentProcessedEvent event);
}

Example 5: Transactional Domain Event Handling

// Service with transactional event publishing
@Service
@Transactional
@Slf4j
public class OrderService {
private final OrderRepository orderRepository;
private final DomainEventPublisher eventPublisher;
private final DomainEventCollector eventCollector;
public OrderService(OrderRepository orderRepository,
DomainEventPublisher eventPublisher,
DomainEventCollector eventCollector) {
this.orderRepository = orderRepository;
this.eventPublisher = eventPublisher;
this.eventCollector = eventCollector;
}
public String createOrder(CreateOrderCommand command) {
log.info("Creating order for customer: {}", command.getCustomerId());
// Create order aggregate
Order order = Order.create(command.getCustomerId(), command.getItems());
// Save order
Order savedOrder = orderRepository.save(order);
// Collect domain events for later publication
eventCollector.collectEvents(order);
log.info("Created order with ID: {}", savedOrder.getId());
return savedOrder.getId();
}
public void cancelOrder(String orderId, CancelOrderCommand command) {
log.info("Cancelling order: {}", orderId);
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
order.cancel(command.getReason(), command.getCancelledBy());
// Save updated order
orderRepository.save(order);
// Collect domain events
eventCollector.collectEvents(order);
log.info("Cancelled order: {}", orderId);
}
public void processOrderPayment(String orderId, ProcessPaymentCommand command) {
log.info("Processing payment for order: {}", orderId);
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
order.processPayment(
command.getPaymentId(),
command.getPaymentMethod(),
command.getTransactionId()
);
orderRepository.save(order);
eventCollector.collectEvents(order);
log.info("Processed payment for order: {}", orderId);
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleTransactionCompletion(DomainEventsCollectedEvent event) {
log.debug("Transaction completed, publishing {} domain events", 
event.getEvents().size());
eventPublisher.publish(event.getEvents());
}
}
// Domain Event Collector
@Component
@Scope(scopeName = "transaction", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class DomainEventCollector {
private final List<DomainEvent> collectedEvents = new ArrayList<>();
private final ApplicationEventPublisher applicationEventPublisher;
public DomainEventCollector(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
public void collectEvents(AggregateRoot aggregate) {
collectedEvents.addAll(aggregate.getDomainEvents());
aggregate.clearEvents();
}
public void publishCollectedEvents() {
if (!collectedEvents.isEmpty()) {
applicationEventPublisher.publishEvent(
new DomainEventsCollectedEvent(this, List.copyOf(collectedEvents))
);
collectedEvents.clear();
}
}
@PreDestroy
public void cleanup() {
if (!collectedEvents.isEmpty()) {
log.warn("Cleaning up {} unprocessed domain events", collectedEvents.size());
}
}
}
// Event indicating collected domain events are ready for publication
public class DomainEventsCollectedEvent extends ApplicationEvent {
private final List<DomainEvent> events;
public DomainEventsCollectedEvent(Object source, List<DomainEvent> events) {
super(source);
this.events = events;
}
public List<DomainEvent> getEvents() {
return events;
}
}

Example 6: Advanced Event Handling with Filtering and Routing

// Event Handler Registry
@Component
public class DomainEventHandlerRegistry {
private final Map<Class<? extends DomainEvent>, List<DomainEventHandler>> handlers;
public DomainEventHandlerRegistry(List<DomainEventHandler> handlerBeans) {
this.handlers = new HashMap<>();
initializeHandlers(handlerBeans);
}
private void initializeHandlers(List<DomainEventHandler> handlerBeans) {
for (DomainEventHandler handler : handlerBeans) {
Class<? extends DomainEvent> eventType = handler.getEventType();
handlers.computeIfAbsent(eventType, k -> new ArrayList<>()).add(handler);
}
}
public List<DomainEventHandler> getHandlersForEvent(DomainEvent event) {
return handlers.getOrDefault(event.getClass(), List.of());
}
public boolean hasHandlersForEvent(DomainEvent event) {
return handlers.containsKey(event.getClass());
}
}
// Generic Event Handler Interface
public interface DomainEventHandler<T extends DomainEvent> {
void handle(T event);
Class<T> getEventType();
default int getOrder() { return 0; }
default boolean shouldHandle(T event) { return true; }
}
// Concrete Event Handlers
@Component
@Order(1)
public class InventoryReservationHandler implements DomainEventHandler<OrderCreatedEvent> {
private final InventoryService inventoryService;
public InventoryReservationHandler(InventoryService inventoryService) {
this.inventoryService = inventoryService;
}
@Override
public void handle(OrderCreatedEvent event) {
event.getItems().forEach(item -> {
inventoryService.reserveItem(
item.getProductId(),
item.getQuantity(),
event.getAggregateId()
);
});
}
@Override
public Class<OrderCreatedEvent> getEventType() {
return OrderCreatedEvent.class;
}
@Override
public boolean shouldHandle(OrderCreatedEvent event) {
// Only handle if order amount is above certain threshold
return event.getAmount().compareTo(new BigDecimal("50.00")) > 0;
}
}
@Component
@Order(2)
public class OrderConfirmationEmailHandler implements DomainEventHandler<OrderCreatedEvent> {
private final EmailService emailService;
public OrderConfirmationEmailHandler(EmailService emailService) {
this.emailService = emailService;
}
@Override
public void handle(OrderCreatedEvent event) {
emailService.sendOrderConfirmation(
event.getAggregateId(),
event.getCustomerId(),
event.getAmount()
);
}
@Override
public Class<OrderCreatedEvent> getEventType() {
return OrderCreatedEvent.class;
}
}
// Advanced Event Publisher with Handler Support
@Component
@Slf4j
public class HandlerBasedDomainEventPublisher implements DomainEventPublisher {
private final DomainEventHandlerRegistry handlerRegistry;
private final ApplicationEventPublisher applicationEventPublisher;
private final TaskExecutor taskExecutor;
public HandlerBasedDomainEventPublisher(DomainEventHandlerRegistry handlerRegistry,
ApplicationEventPublisher applicationEventPublisher,
TaskExecutor taskExecutor) {
this.handlerRegistry = handlerRegistry;
this.applicationEventPublisher = applicationEventPublisher;
this.taskExecutor = taskExecutor;
}
@Override
public void publish(DomainEvent event) {
log.debug("Publishing domain event: {}", event.getEventType());
// 1. Publish as Spring application event for @EventListener methods
applicationEventPublisher.publishEvent(
new DomainEventApplicationEvent(this, event)
);
// 2. Execute registered domain event handlers
executeDomainEventHandlers(event);
}
@Override
public void publish(Collection<DomainEvent> events) {
events.forEach(this::publish);
}
private void executeDomainEventHandlers(DomainEvent event) {
List<DomainEventHandler> handlers = handlerRegistry.getHandlersForEvent(event);
if (handlers.isEmpty()) {
log.debug("No handlers registered for event type: {}", event.getEventType());
return;
}
// Sort handlers by order
handlers.sort(Comparator.comparing(DomainEventHandler::getOrder));
// Execute handlers asynchronously
for (DomainEventHandler handler : handlers) {
if (handler.shouldHandle(event)) {
taskExecutor.execute(() -> {
try {
log.debug("Executing handler: {} for event: {}", 
handler.getClass().getSimpleName(), event.getEventType());
handler.handle(event);
} catch (Exception e) {
log.error("Handler {} failed for event {}", 
handler.getClass().getSimpleName(), event.getEventType(), e);
}
});
}
}
}
}

Example 7: Event Sourcing with Domain Events

// Event Sourced Aggregate
public abstract class EventSourcedAggregateRoot {
private final List<DomainEvent> changes = new ArrayList<>();
private Long version = 0L;
private String id;
public abstract void when(DomainEvent event);
protected void apply(DomainEvent event) {
when(event);
changes.add(event);
version++;
}
public void loadFromHistory(List<DomainEvent> history) {
history.forEach(this::when);
this.version = (long) history.size();
}
public List<DomainEvent> getUncommittedChanges() {
return new ArrayList<>(changes);
}
public void markChangesAsCommitted() {
changes.clear();
}
public Long getVersion() { return version; }
public String getId() { return id; }
protected void setId(String id) { this.id = id; }
}
// Event-Sourced Order Aggregate
public class EventSourcedOrder extends EventSourcedAggregateRoot {
private String customerId;
private OrderStatus status;
private BigDecimal amount;
private List<OrderItem> items = new ArrayList<>();
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
// Factory method
public static EventSourcedOrder create(String customerId, List<OrderItem> items) {
EventSourcedOrder order = new EventSourcedOrder();
String orderId = UUID.randomUUID().toString();
BigDecimal amount = calculateTotalAmount(items);
order.apply(new OrderCreatedEvent(orderId, customerId, amount, items));
return order;
}
public void cancel(String reason, String cancelledBy) {
if (this.status == OrderStatus.CANCELLED) {
throw new IllegalOrderStateException("Order already cancelled");
}
apply(new OrderCancelledEvent(getId(), reason, cancelledBy));
}
// Event application methods
@Override
public void when(DomainEvent event) {
if (event instanceof OrderCreatedEvent e) {
handleOrderCreated(e);
} else if (event instanceof OrderCancelledEvent e) {
handleOrderCancelled(e);
} else if (event instanceof PaymentProcessedEvent e) {
handlePaymentProcessed(e);
} else if (event instanceof OrderShippedEvent e) {
handleOrderShipped(e);
} else if (event instanceof OrderItemAddedEvent e) {
handleOrderItemAdded(e);
}
}
private void handleOrderCreated(OrderCreatedEvent event) {
this.setId(event.getAggregateId());
this.customerId = event.getCustomerId();
this.amount = event.getAmount();
this.items = new ArrayList<>(event.getItems());
this.status = OrderStatus.CREATED;
this.createdAt = event.getOccurredAt();
this.updatedAt = event.getOccurredAt();
}
private void handleOrderCancelled(OrderCancelledEvent event) {
this.status = OrderStatus.CANCELLED;
this.updatedAt = event.getOccurredAt();
}
private void handlePaymentProcessed(PaymentProcessedEvent event) {
this.status = OrderStatus.PAYMENT_PROCESSED;
this.updatedAt = event.getOccurredAt();
}
private void handleOrderShipped(OrderShippedEvent event) {
this.status = OrderStatus.SHIPPED;
this.updatedAt = event.getOccurredAt();
}
private void handleOrderItemAdded(OrderItemAddedEvent event) {
this.items.add(event.getItem());
this.amount = this.amount.add(event.getItem().getPrice().multiply(
BigDecimal.valueOf(event.getItem().getQuantity())));
this.updatedAt = event.getOccurredAt();
}
private static BigDecimal calculateTotalAmount(List<OrderItem> items) {
return items.stream()
.map(item -> item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
}
// Getters
public String getCustomerId() { return customerId; }
public OrderStatus getStatus() { return status; }
public BigDecimal getAmount() { return amount; }
public List<OrderItem> getItems() { return List.copyOf(items); }
public LocalDateTime getCreatedAt() { return createdAt; }
public LocalDateTime getUpdatedAt() { return updatedAt; }
}
// Event Store
public interface EventStore {
void save(String aggregateType, String aggregateId, 
List<DomainEvent> events, Long expectedVersion);
List<DomainEvent> loadEvents(String aggregateType, String aggregateId);
List<DomainEvent> loadEventsByType(String aggregateType, LocalDateTime from, LocalDateTime to);
}
// Event Sourcing Repository
@Component
public class EventSourcingRepository {
private final EventStore eventStore;
private final DomainEventPublisher eventPublisher;
public EventSourcingRepository(EventStore eventStore, 
DomainEventPublisher eventPublisher) {
this.eventStore = eventStore;
this.eventPublisher = eventPublisher;
}
public void save(EventSourcedAggregateRoot aggregate) {
List<DomainEvent> changes = aggregate.getUncommittedChanges();
eventStore.save(
aggregate.getClass().getSimpleName(),
aggregate.getId(),
changes,
aggregate.getVersion() - changes.size()
);
// Publish events
eventPublisher.publish(changes);
aggregate.markChangesAsCommitted();
}
public <T extends EventSourcedAggregateRoot> T load(String aggregateId, 
Class<T> aggregateClass) {
try {
T aggregate = aggregateClass.getDeclaredConstructor().newInstance();
List<DomainEvent> events = eventStore.loadEvents(
aggregateClass.getSimpleName(), aggregateId);
aggregate.loadFromHistory(events);
return aggregate;
} catch (Exception e) {
throw new AggregateLoadException("Failed to load aggregate: " + aggregateId, e);
}
}
}
class AggregateLoadException extends RuntimeException {
public AggregateLoadException(String message, Throwable cause) {
super(message, cause);
}
}

Testing Domain Events

Example 8: Testing Domain Events

@ExtendWith(MockitoExtension.class)
class OrderTest {
@Test
@DisplayName("Should emit OrderCreated event when order is created")
void shouldEmitOrderCreatedEvent() {
// Given
List<OrderItem> items = List.of(
new OrderItem("prod-1", "Product 1", new BigDecimal("29.99"), 2)
);
// When
Order order = Order.create("customer-123", items);
// Then
List<DomainEvent> events = order.getDomainEvents();
assertThat(events).hasSize(1);
DomainEvent event = events.get(0);
assertThat(event).isInstanceOf(OrderCreatedEvent.class);
assertThat(event.getEventType()).isEqualTo("OrderCreated");
assertThat(event.getAggregateId()).isEqualTo(order.getId());
}
@Test
@DisplayName("Should emit OrderCancelled event when order is cancelled")
void shouldEmitOrderCancelledEvent() {
// Given
Order order = Order.create("customer-123", 
List.of(new OrderItem("prod-1", "Product 1", new BigDecimal("19.99"), 1)));
order.clearEvents(); // Clear creation event
// When
order.cancel("Customer requested", "system");
// Then
List<DomainEvent> events = order.getDomainEvents();
assertThat(events).hasSize(1);
DomainEvent event = events.get(0);
assertThat(event).isInstanceOf(OrderCancelledEvent.class);
OrderCancelledEvent cancelledEvent = (OrderCancelledEvent) event;
assertThat(cancelledEvent.getReason()).isEqualTo("Customer requested");
assertThat(cancelledEvent.getCancelledBy()).isEqualTo("system");
}
}
@SpringBootTest
@ExtendWith(SpringExtension.class)
class OrderServiceIntegrationTest {
@Autowired
private OrderService orderService;
@Autowired
private OrderRepository orderRepository;
@MockBean
private DomainEventPublisher eventPublisher;
@Test
@DisplayName("Should publish domain events after transaction commit")
void shouldPublishEventsAfterTransactionCommit() {
// Given
CreateOrderCommand command = new CreateOrderCommand(
"customer-123", 
List.of(new OrderItem("prod-1", "Product 1", new BigDecimal("49.99"), 1))
);
// When
String orderId = orderService.createOrder(command);
// Then
verify(eventPublisher, timeout(5000).atLeastOnce())
.publish(any(DomainEvent.class));
Order order = orderRepository.findById(orderId).orElseThrow();
assertThat(order.getStatus()).isEqualTo(OrderStatus.CREATED);
}
}
// Test-specific event listener for verification
@Component
@Primary
@TestProfile
public class TestDomainEventCollector {
private final List<DomainEvent> capturedEvents = new ArrayList<>();
@EventListener
public void captureEvent(DomainEventApplicationEvent event) {
capturedEvents.add(event.getDomainEvent());
}
public List<DomainEvent> getCapturedEvents() {
return new ArrayList<>(capturedEvents);
}
public void clear() {
capturedEvents.clear();
}
public <T extends DomainEvent> List<T> getEventsOfType(Class<T> eventType) {
return capturedEvents.stream()
.filter(eventType::isInstance)
.map(eventType::cast)
.collect(Collectors.toList());
}
}

Best Practices and Considerations

1. Event Design

  • Keep events focused: Each event should represent one meaningful business occurrence
  • Include all relevant data: Events should contain all information needed by consumers
  • Use value objects: Prefer value objects over primitive types in events
  • Version events: Include versioning for event schema evolution

2. Performance Considerations

  • Async processing: Use asynchronous event processing when possible
  • Batch processing: Consider batching events for better performance
  • Event filtering: Implement event filtering to avoid unnecessary processing

3. Error Handling

  • Dead letter queues: Implement DLQ for events that repeatedly fail
  • Retry mechanisms: Add retry logic with exponential backoff
  • Circuit breakers: Use circuit breakers for external service calls

4. Monitoring

  • Event metrics: Track event publication and processing metrics
  • Audit logging: Log important events for debugging and compliance
  • Health checks: Monitor event processing health

Conclusion

Domain Events Publishing provides a powerful way to build decoupled, maintainable, and scalable domain models:

Key Benefits:

  • Loose Coupling: Domain components don't need to know about each other
  • Extensibility: Easy to add new behavior by subscribing to events
  • Auditability: Complete history of domain state changes
  • Testability: Easy to test domain logic in isolation

When to Use:

  • Complex business domains with multiple side effects
  • Microservices architectures
  • Event-driven systems
  • Systems requiring audit trails
  • When you need to react to domain changes

Implementation Considerations:

  • Choose between immediate and deferred event publication
  • Consider transaction boundaries and consistency
  • Plan for event schema evolution
  • Implement proper error handling and monitoring

Domain Events enable you to build systems that are more aligned with business processes and easier to evolve over time.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper