Introduction
Aggregate Root Versioning is a critical pattern in Domain-Driven Design (DDD) that ensures data consistency in concurrent environments. By implementing optimistic concurrency control through versioning, we can prevent lost updates and maintain data integrity when multiple processes attempt to modify the same aggregate simultaneously.
This comprehensive guide explores aggregate root versioning implementations in Java, covering both simple optimistic locking and advanced event-sourced approaches.
Core Concepts
1. What is an Aggregate Root?
- Aggregate: A cluster of associated objects treated as a unit
- Aggregate Root: The single entry point for accessing the aggregate
- Invariants: Business rules that must be consistent within the aggregate
2. Versioning Strategies
- Optimistic Locking: Version numbers with conflict detection
- Pessimistic Locking: Database-level locks
- Event Sourcing: Versioning through event sequence numbers
Project Setup and Dependencies
1. Maven Dependencies
<dependencies> <!-- Spring Boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-validation</artifactId> </dependency> <!-- Database --> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <scope>runtime</scope> </dependency> <!-- Testing --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>postgresql</artifactId> <scope>test</scope> </dependency> <!-- Utilities --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.13.0</version> </dependency> </dependencies>
Basic Optimistic Locking Implementation
1. Versioned Aggregate Root Base Class
@MappedSuperclass
@Data
public abstract class VersionedAggregateRoot<T extends AggregateRootId> {
@EmbeddedId
protected T id;
@Version
@Column(name = "version", nullable = false)
protected Long version;
@CreationTimestamp
@Column(name = "created_at", nullable = false, updatable = false)
protected LocalDateTime createdAt;
@UpdateTimestamp
@Column(name = "updated_at", nullable = false)
protected LocalDateTime updatedAt;
protected VersionedAggregateRoot() {
this.version = 0L;
}
protected VersionedAggregateRoot(T id) {
this.id = id;
this.version = 0L;
this.createdAt = LocalDateTime.now();
this.updatedAt = LocalDateTime.now();
}
public void incrementVersion() {
this.version++;
this.updatedAt = LocalDateTime.now();
}
public abstract void validateInvariants();
}
@Embeddable
public abstract class AggregateRootId implements Serializable {
protected String value;
protected AggregateRootId() {}
protected AggregateRootId(String value) {
this.value = value;
}
public String getValue() {
return value;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AggregateRootId that = (AggregateRootId) o;
return Objects.equals(value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(value);
}
@Override
public String toString() {
return value;
}
}
2. Domain Models with Versioning
// Order Aggregate
@Entity
@Table(name = "orders")
@Data
@NoArgsConstructor
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class Order extends VersionedAggregateRoot<OrderId> {
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false)
private OrderStatus status;
@Column(name = "customer_id", nullable = false)
private String customerId;
@Column(name = "total_amount", nullable = false, precision = 19, scale = 4)
private BigDecimal totalAmount;
@OneToMany(mappedBy = "order", cascade = CascadeType.ALL, orphanRemoval = true)
private List<OrderLine> orderLines;
@ElementCollection
@CollectionTable(name = "order_events", joinColumns = @JoinColumn(name = "order_id"))
private List<OrderDomainEvent> domainEvents;
// Business logic methods
public Order(OrderId orderId, String customerId) {
super(orderId);
this.status = OrderStatus.DRAFT;
this.customerId = customerId;
this.totalAmount = BigDecimal.ZERO;
this.orderLines = new ArrayList<>();
this.domainEvents = new ArrayList<>();
validateInvariants();
}
public void addItem(String productId, String productName, int quantity, BigDecimal unitPrice) {
if (status != OrderStatus.DRAFT) {
throw new IllegalOrderStateException("Cannot add items to order in " + status + " state");
}
OrderLine line = OrderLine.builder()
.productId(productId)
.productName(productName)
.quantity(quantity)
.unitPrice(unitPrice)
.totalPrice(unitPrice.multiply(BigDecimal.valueOf(quantity)))
.build();
orderLines.add(line);
recalculateTotal();
incrementVersion();
domainEvents.add(new OrderItemAddedEvent(
getId().getValue(),
productId,
productName,
quantity,
unitPrice
));
validateInvariants();
}
public void removeItem(String productId) {
if (status != OrderStatus.DRAFT) {
throw new IllegalOrderStateException("Cannot remove items from order in " + status + " state");
}
boolean removed = orderLines.removeIf(line -> line.getProductId().equals(productId));
if (removed) {
recalculateTotal();
incrementVersion();
domainEvents.add(new OrderItemRemovedEvent(
getId().getValue(),
productId
));
}
validateInvariants();
}
public void submit() {
if (status != OrderStatus.DRAFT) {
throw new IllegalOrderStateException("Order already submitted");
}
if (orderLines.isEmpty()) {
throw new IllegalOrderStateException("Cannot submit empty order");
}
this.status = OrderStatus.SUBMITTED;
incrementVersion();
domainEvents.add(new OrderSubmittedEvent(
getId().getValue(),
customerId,
totalAmount
));
validateInvariants();
}
public void cancel() {
if (status == OrderStatus.CANCELLED || status == OrderStatus.SHIPPED) {
throw new IllegalOrderStateException("Order cannot be cancelled in " + status + " state");
}
this.status = OrderStatus.CANCELLED;
incrementVersion();
domainEvents.add(new OrderCancelledEvent(
getId().getValue(),
"User requested cancellation"
));
validateInvariants();
}
private void recalculateTotal() {
this.totalAmount = orderLines.stream()
.map(OrderLine::getTotalPrice)
.reduce(BigDecimal.ZERO, BigDecimal::add);
}
@Override
public void validateInvariants() {
List<String> violations = new ArrayList<>();
if (totalAmount.compareTo(BigDecimal.ZERO) < 0) {
violations.add("Total amount cannot be negative");
}
if (orderLines.stream().anyMatch(line -> line.getQuantity() <= 0)) {
violations.add("Order line quantity must be positive");
}
if (!violations.isEmpty()) {
throw new AggregateInvariantViolationException(
"Order invariants violated: " + String.join(", ", violations));
}
}
public List<OrderDomainEvent> getDomainEvents() {
return new ArrayList<>(domainEvents);
}
public void clearDomainEvents() {
domainEvents.clear();
}
}
// Value Objects
@Embeddable
public class OrderId extends AggregateRootId {
public OrderId() {
super();
}
public OrderId(String value) {
super(value);
}
public static OrderId generate() {
return new OrderId("ORD-" + UUID.randomUUID().toString());
}
public static OrderId fromString(String value) {
return new OrderId(value);
}
}
@Entity
@Table(name = "order_lines")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class OrderLine {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "order_id")
private Order order;
@Column(name = "product_id", nullable = false)
private String productId;
@Column(name = "product_name", nullable = false)
private String productName;
@Column(name = "quantity", nullable = false)
private Integer quantity;
@Column(name = "unit_price", nullable = false, precision = 19, scale = 4)
private BigDecimal unitPrice;
@Column(name = "total_price", nullable = false, precision = 19, scale = 4)
private BigDecimal totalPrice;
}
// Domain Events
public abstract class OrderDomainEvent {
protected final String orderId;
protected final LocalDateTime occurredAt;
protected final Long version;
protected OrderDomainEvent(String orderId, Long version) {
this.orderId = orderId;
this.occurredAt = LocalDateTime.now();
this.version = version;
}
public String getOrderId() { return orderId; }
public LocalDateTime getOccurredAt() { return occurredAt; }
public Long getVersion() { return version; }
}
public class OrderItemAddedEvent extends OrderDomainEvent {
private final String productId;
private final String productName;
private final Integer quantity;
private final BigDecimal unitPrice;
public OrderItemAddedEvent(String orderId, String productId, String productName,
Integer quantity, BigDecimal unitPrice) {
super(orderId, null); // Version will be set when event is recorded
this.productId = productId;
this.productName = productName;
this.quantity = quantity;
this.unitPrice = unitPrice;
}
// Getters
public String getProductId() { return productId; }
public String getProductName() { return productName; }
public Integer getQuantity() { return quantity; }
public BigDecimal getUnitPrice() { return unitPrice; }
}
public class OrderSubmittedEvent extends OrderDomainEvent {
private final String customerId;
private final BigDecimal totalAmount;
public OrderSubmittedEvent(String orderId, String customerId, BigDecimal totalAmount) {
super(orderId, null);
this.customerId = customerId;
this.totalAmount = totalAmount;
}
// Getters
public String getCustomerId() { return customerId; }
public BigDecimal getTotalAmount() { return totalAmount; }
}
Repository Implementation with Optimistic Locking
1. Custom Repository with Version Control
@Repository
public interface OrderRepository extends JpaRepository<Order, OrderId> {
@Query("SELECT o FROM Order o WHERE o.id = :id AND o.version = :version")
Optional<Order> findByIdAndVersion(@Param("id") OrderId id, @Param("version") Long version);
@Query("SELECT o.version FROM Order o WHERE o.id = :id")
Optional<Long> findVersionById(@Param("id") OrderId id);
@Lock(LockModeType.OPTIMISTIC_FORCE_INCREMENT)
@Query("SELECT o FROM Order o WHERE o.id = :id")
Optional<Order> findByIdWithLock(@Param("id") OrderId id);
List<Order> findByStatus(OrderStatus status);
@Query("SELECT COUNT(o) FROM Order o WHERE o.customerId = :customerId AND o.status = :status")
long countByCustomerIdAndStatus(@Param("customerId") String customerId,
@Param("status") OrderStatus status);
}
@Service
@Slf4j
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final DomainEventPublisher eventPublisher;
private final OptimisticLockRetryTemplate retryTemplate;
public OrderService(OrderRepository orderRepository,
DomainEventPublisher eventPublisher,
OptimisticLockRetryTemplate retryTemplate) {
this.orderRepository = orderRepository;
this.eventPublisher = eventPublisher;
this.retryTemplate = retryTemplate;
}
public Order createOrder(String customerId) {
OrderId orderId = OrderId.generate();
Order order = new Order(orderId, customerId);
Order saved = orderRepository.save(order);
log.info("Created new order: {} for customer: {}", orderId, customerId);
return saved;
}
public Order addOrderItem(String orderId, String productId, String productName,
int quantity, BigDecimal unitPrice) {
return retryTemplate.executeWithRetry(() -> {
OrderId id = OrderId.fromString(orderId);
Order order = orderRepository.findById(id)
.orElseThrow(() -> new OrderNotFoundException("Order not found: " + orderId));
order.addItem(productId, productName, quantity, unitPrice);
Order updated = orderRepository.save(order);
publishDomainEvents(updated);
log.debug("Added item to order: {}, product: {}, quantity: {}",
orderId, productId, quantity);
return updated;
});
}
public Order submitOrder(String orderId) {
return retryTemplate.executeWithRetry(() -> {
OrderId id = OrderId.fromString(orderId);
Order order = orderRepository.findById(id)
.orElseThrow(() -> new OrderNotFoundException("Order not found: " + orderId));
order.submit();
Order updated = orderRepository.save(order);
publishDomainEvents(updated);
log.info("Submitted order: {}", orderId);
return updated;
});
}
public Order cancelOrder(String orderId, String reason) {
return retryTemplate.executeWithRetry(() -> {
OrderId id = OrderId.fromString(orderId);
Order order = orderRepository.findById(id)
.orElseThrow(() -> new OrderNotFoundException("Order not found: " + orderId));
order.cancel();
Order updated = orderRepository.save(order);
publishDomainEvents(updated);
log.info("Cancelled order: {}, reason: {}", orderId, reason);
return updated;
});
}
@Transactional(readOnly = true)
public Optional<Order> getOrder(String orderId) {
OrderId id = OrderId.fromString(orderId);
return orderRepository.findById(id);
}
@Transactional(readOnly = true)
public Order getOrderWithVersion(String orderId, Long expectedVersion) {
OrderId id = OrderId.fromString(orderId);
return orderRepository.findByIdAndVersion(id, expectedVersion)
.orElseThrow(() -> new OptimisticLockingException(
"Order " + orderId + " version mismatch. Expected: " + expectedVersion));
}
private void publishDomainEvents(Order order) {
List<OrderDomainEvent> events = order.getDomainEvents();
events.forEach(eventPublisher::publish);
order.clearDomainEvents();
}
}
@Component
@Slf4j
public class OptimisticLockRetryTemplate {
private static final int MAX_RETRIES = 3;
private static final long INITIAL_BACKOFF_MS = 100;
public <T> T executeWithRetry(Supplier<T> operation) {
int attempt = 0;
Exception lastException;
do {
try {
return operation.get();
} catch (OptimisticLockingException e) {
lastException = e;
attempt++;
if (attempt < MAX_RETRIES) {
long backoffMs = INITIAL_BACKOFF_MS * (long) Math.pow(2, attempt - 1);
log.warn("Optimistic lock conflict on attempt {}/{}, retrying in {} ms",
attempt, MAX_RETRIES, backoffMs);
try {
Thread.sleep(backoffMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Retry interrupted", ie);
}
}
}
} while (attempt < MAX_RETRIES);
throw new OptimisticLockRetryExhaustedException(
"Failed after " + MAX_RETRIES + " retries due to optimistic lock conflicts",
lastException);
}
public void executeWithRetry(Runnable operation) {
executeWithRetry(() -> {
operation.run();
return null;
});
}
}
Advanced Event Sourcing Implementation
1. Event-Sourced Aggregate Root
public abstract class EventSourcedAggregateRoot<T extends AggregateRootId> {
protected final T id;
protected Long version;
protected final List<DomainEvent> changes;
protected EventSourcedAggregateRoot(T id) {
this.id = id;
this.version = 0L;
this.changes = new ArrayList<>();
}
protected EventSourcedAggregateRoot(T id, List<DomainEvent> history) {
this.id = id;
this.changes = new ArrayList<>();
rebuildFromHistory(history);
}
public abstract void when(DomainEvent event);
protected void apply(DomainEvent event) {
event.setVersion(version + 1);
when(event);
changes.add(event);
version++;
}
private void rebuildFromHistory(List<DomainEvent> history) {
for (DomainEvent event : history) {
when(event);
this.version = event.getVersion();
}
}
public void markChangesAsCommitted() {
this.changes.clear();
}
public T getId() { return id; }
public Long getVersion() { return version; }
public List<DomainEvent> getUncommittedChanges() { return new ArrayList<>(changes); }
}
// Event-Sourced Order Aggregate
public class EventSourcedOrder extends EventSourcedAggregateRoot<OrderId> {
private OrderStatus status;
private String customerId;
private BigDecimal totalAmount;
private final Map<String, OrderLine> orderLines;
public EventSourcedOrder(OrderId orderId, String customerId) {
super(orderId);
this.customerId = customerId;
this.status = OrderStatus.DRAFT;
this.totalAmount = BigDecimal.ZERO;
this.orderLines = new HashMap<>();
apply(new OrderCreatedEvent(orderId.getValue(), customerId));
}
public EventSourcedOrder(OrderId orderId, List<DomainEvent> history) {
super(orderId, history);
this.orderLines = new HashMap<>();
}
public void addItem(String productId, String productName, int quantity, BigDecimal unitPrice) {
if (status != OrderStatus.DRAFT) {
throw new IllegalOrderStateException("Cannot add items to order in " + status + " state");
}
apply(new OrderItemAddedEvent(
getId().getValue(),
productId,
productName,
quantity,
unitPrice
));
}
public void removeItem(String productId) {
if (status != OrderStatus.DRAFT) {
throw new IllegalOrderStateException("Cannot remove items from order in " + status + " state");
}
if (orderLines.containsKey(productId)) {
apply(new OrderItemRemovedEvent(getId().getValue(), productId));
}
}
public void submit() {
if (status != OrderStatus.DRAFT) {
throw new IllegalOrderStateException("Order already submitted");
}
if (orderLines.isEmpty()) {
throw new IllegalOrderStateException("Cannot submit empty order");
}
apply(new OrderSubmittedEvent(getId().getValue(), customerId, totalAmount));
}
public void cancel(String reason) {
if (status == OrderStatus.CANCELLED || status == OrderStatus.SHIPPED) {
throw new IllegalOrderStateException("Order cannot be cancelled in " + status + " state");
}
apply(new OrderCancelledEvent(getId().getValue(), reason));
}
@Override
public void when(DomainEvent event) {
switch (event) {
case OrderCreatedEvent e -> handleOrderCreated(e);
case OrderItemAddedEvent e -> handleItemAdded(e);
case OrderItemRemovedEvent e -> handleItemRemoved(e);
case OrderSubmittedEvent e -> handleOrderSubmitted(e);
case OrderCancelledEvent e -> handleOrderCancelled(e);
default -> throw new IllegalArgumentException("Unknown event type: " + event.getClass());
}
}
private void handleOrderCreated(OrderCreatedEvent event) {
this.status = OrderStatus.DRAFT;
this.customerId = event.getCustomerId();
this.totalAmount = BigDecimal.ZERO;
}
private void handleItemAdded(OrderItemAddedEvent event) {
BigDecimal lineTotal = event.getUnitPrice().multiply(BigDecimal.valueOf(event.getQuantity()));
OrderLine line = OrderLine.builder()
.productId(event.getProductId())
.productName(event.getProductName())
.quantity(event.getQuantity())
.unitPrice(event.getUnitPrice())
.totalPrice(lineTotal)
.build();
orderLines.put(event.getProductId(), line);
totalAmount = totalAmount.add(lineTotal);
}
private void handleItemRemoved(OrderItemRemovedEvent event) {
OrderLine removed = orderLines.remove(event.getProductId());
if (removed != null) {
totalAmount = totalAmount.subtract(removed.getTotalPrice());
}
}
private void handleOrderSubmitted(OrderSubmittedEvent event) {
this.status = OrderStatus.SUBMITTED;
}
private void handleOrderCancelled(OrderCancelledEvent event) {
this.status = OrderStatus.CANCELLED;
}
// Getters
public OrderStatus getStatus() { return status; }
public String getCustomerId() { return customerId; }
public BigDecimal getTotalAmount() { return totalAmount; }
public Collection<OrderLine> getOrderLines() { return orderLines.values(); }
}
2. Event Store Implementation
@Entity
@Table(name = "event_store")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class EventStore {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "aggregate_id", nullable = false)
private String aggregateId;
@Column(name = "aggregate_type", nullable = false)
private String aggregateType;
@Column(name = "event_type", nullable = false)
private String eventType;
@Column(name = "event_data", nullable = false, columnDefinition = "TEXT")
private String eventData;
@Column(name = "event_version", nullable = false)
private Long eventVersion;
@Column(name = "timestamp", nullable = false)
private LocalDateTime timestamp;
@Column(name = "metadata", columnDefinition = "TEXT")
private String metadata;
}
@Repository
public interface EventStoreRepository extends JpaRepository<EventStore, Long> {
List<EventStore> findByAggregateIdOrderByEventVersionAsc(String aggregateId);
@Query("SELECT MAX(es.eventVersion) FROM EventStore es WHERE es.aggregateId = :aggregateId")
Optional<Long> findLatestVersionByAggregateId(@Param("aggregateId") String aggregateId);
@Query("SELECT es FROM EventStore es WHERE es.aggregateId = :aggregateId AND es.eventVersion > :version")
List<EventStore> findByAggregateIdAndVersionGreaterThan(
@Param("aggregateId") String aggregateId,
@Param("version") Long version);
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT es FROM EventStore es WHERE es.aggregateId = :aggregateId")
List<EventStore> findByAggregateIdWithLock(@Param("aggregateId") String aggregateId);
}
@Service
@Slf4j
@Transactional
public class EventStoreService {
private final EventStoreRepository eventStoreRepository;
private final ObjectMapper objectMapper;
private final EventSerializer eventSerializer;
public EventStoreService(EventStoreRepository eventStoreRepository,
ObjectMapper objectMapper,
EventSerializer eventSerializer) {
this.eventStoreRepository = eventStoreRepository;
this.objectMapper = objectMapper;
this.eventSerializer = eventSerializer;
}
public void saveEvents(String aggregateId, String aggregateType,
List<DomainEvent> events, Long expectedVersion) {
Long currentVersion = getCurrentVersion(aggregateId);
if (expectedVersion != null && !currentVersion.equals(expectedVersion)) {
throw new ConcurrencyException(
"Aggregate " + aggregateId + " version mismatch. Expected: " +
expectedVersion + ", Actual: " + currentVersion);
}
for (DomainEvent event : events) {
saveEvent(aggregateId, aggregateType, event);
}
log.debug("Saved {} events for aggregate: {}", events.size(), aggregateId);
}
public <T extends EventSourcedAggregateRoot<?>> T loadAggregate(
String aggregateId, String aggregateType,
Function<List<DomainEvent>, T> aggregateFactory) {
List<EventStore> eventRecords = eventStoreRepository
.findByAggregateIdOrderByEventVersionAsc(aggregateId);
if (eventRecords.isEmpty()) {
throw new AggregateNotFoundException("Aggregate not found: " + aggregateId);
}
List<DomainEvent> events = eventRecords.stream()
.map(this::deserializeEvent)
.collect(Collectors.toList());
return aggregateFactory.apply(events);
}
public List<DomainEvent> getEventsAfterVersion(String aggregateId, Long version) {
return eventStoreRepository
.findByAggregateIdAndVersionGreaterThan(aggregateId, version)
.stream()
.map(this::deserializeEvent)
.collect(Collectors.toList());
}
private void saveEvent(String aggregateId, String aggregateType, DomainEvent event) {
try {
String eventData = eventSerializer.serialize(event);
String metadata = objectMapper.writeValueAsString(createMetadata(event));
EventStore eventStore = EventStore.builder()
.aggregateId(aggregateId)
.aggregateType(aggregateType)
.eventType(event.getClass().getSimpleName())
.eventData(eventData)
.eventVersion(event.getVersion())
.timestamp(LocalDateTime.now())
.metadata(metadata)
.build();
eventStoreRepository.save(eventStore);
} catch (JsonProcessingException e) {
throw new EventSerializationException("Failed to serialize event", e);
}
}
private DomainEvent deserializeEvent(EventStore eventStore) {
return eventSerializer.deserialize(
eventStore.getEventData(),
eventStore.getEventType()
);
}
private Long getCurrentVersion(String aggregateId) {
return eventStoreRepository.findLatestVersionByAggregateId(aggregateId)
.orElse(0L);
}
private Map<String, Object> createMetadata(DomainEvent event) {
Map<String, Object> metadata = new HashMap<>();
metadata.put("occurredAt", event.getOccurredAt());
metadata.put("eventId", UUID.randomUUID().toString());
return metadata;
}
}
@Component
public class EventSerializer {
private final ObjectMapper objectMapper;
private final Map<String, Class<? extends DomainEvent>> eventTypeRegistry;
public EventSerializer(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
this.eventTypeRegistry = new HashMap<>();
registerEventTypes();
}
private void registerEventTypes() {
eventTypeRegistry.put("OrderCreatedEvent", OrderCreatedEvent.class);
eventTypeRegistry.put("OrderItemAddedEvent", OrderItemAddedEvent.class);
eventTypeRegistry.put("OrderItemRemovedEvent", OrderItemRemovedEvent.class);
eventTypeRegistry.put("OrderSubmittedEvent", OrderSubmittedEvent.class);
eventTypeRegistry.put("OrderCancelledEvent", OrderCancelledEvent.class);
}
public String serialize(DomainEvent event) {
try {
return objectMapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
throw new EventSerializationException("Failed to serialize event", e);
}
}
public DomainEvent deserialize(String eventData, String eventType) {
try {
Class<? extends DomainEvent> eventClass = eventTypeRegistry.get(eventType);
if (eventClass == null) {
throw new EventDeserializationException("Unknown event type: " + eventType);
}
return objectMapper.readValue(eventData, eventClass);
} catch (JsonProcessingException e) {
throw new EventDeserializationException("Failed to deserialize event", e);
}
}
}
Concurrency Control Strategies
1. Pessimistic Locking Service
@Service
@Slf4j
@Transactional
public class PessimisticLockingService {
private final OrderRepository orderRepository;
private final EntityManager entityManager;
public PessimisticLockingService(OrderRepository orderRepository,
EntityManager entityManager) {
this.orderRepository = orderRepository;
this.entityManager = entityManager;
}
public Order updateOrderWithPessimisticLock(String orderId, Consumer<Order> updateOperation) {
OrderId id = OrderId.fromString(orderId);
// Acquire pessimistic lock
Order order = entityManager.find(Order.class, id, LockModeType.PESSIMISTIC_WRITE);
if (order == null) {
throw new OrderNotFoundException("Order not found: " + orderId);
}
// Perform update
updateOperation.accept(order);
Order updated = orderRepository.save(order);
publishDomainEvents(updated);
log.debug("Updated order with pessimistic lock: {}", orderId);
return updated;
}
@Transactional(readOnly = true)
public Order readOrderWithPessimisticLock(String orderId) {
OrderId id = OrderId.fromString(orderId);
return entityManager.find(Order.class, id, LockModeType.PESSIMISTIC_READ);
}
private void publishDomainEvents(Order order) {
// Implementation for publishing domain events
}
}
2. Version Validation Service
@Service
@Slf4j
public class VersionValidationService {
private final OrderRepository orderRepository;
public VersionValidationService(OrderRepository orderRepository) {
this.orderRepository = orderRepository;
}
public void validateVersion(String orderId, Long expectedVersion) {
OrderId id = OrderId.fromString(orderId);
Long currentVersion = orderRepository.findVersionById(id)
.orElseThrow(() -> new OrderNotFoundException("Order not found: " + orderId));
if (!currentVersion.equals(expectedVersion)) {
throw new OptimisticLockingException(
"Version mismatch for order " + orderId +
". Expected: " + expectedVersion + ", Actual: " + currentVersion);
}
}
public boolean isVersionCurrent(String orderId, Long expectedVersion) {
try {
validateVersion(orderId, expectedVersion);
return true;
} catch (OptimisticLockingException e) {
return false;
}
}
public Long getCurrentVersion(String orderId) {
OrderId id = OrderId.fromString(orderId);
return orderRepository.findVersionById(id)
.orElseThrow(() -> new OrderNotFoundException("Order not found: " + orderId));
}
}
Custom Exceptions
public class OptimisticLockingException extends RuntimeException {
public OptimisticLockingException(String message) {
super(message);
}
public OptimisticLockingException(String message, Throwable cause) {
super(message, cause);
}
}
public class OptimisticLockRetryExhaustedException extends RuntimeException {
public OptimisticLockRetryExhaustedException(String message, Throwable cause) {
super(message, cause);
}
}
public class AggregateInvariantViolationException extends RuntimeException {
public AggregateInvariantViolationException(String message) {
super(message);
}
}
public class IllegalOrderStateException extends RuntimeException {
public IllegalOrderStateException(String message) {
super(message);
}
}
public class OrderNotFoundException extends RuntimeException {
public OrderNotFoundException(String message) {
super(message);
}
}
public class ConcurrencyException extends RuntimeException {
public ConcurrencyException(String message) {
super(message);
}
}
public class AggregateNotFoundException extends RuntimeException {
public AggregateNotFoundException(String message) {
super(message);
}
}
public class EventSerializationException extends RuntimeException {
public EventSerializationException(String message, Throwable cause) {
super(message, cause);
}
}
public class EventDeserializationException extends RuntimeException {
public EventDeserializationException(String message) {
super(message);
}
public EventDeserializationException(String message, Throwable cause) {
super(message, cause);
}
}
Testing Aggregate Root Versioning
1. Comprehensive Test Suite
@DataJpaTest
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class AggregateRootVersioningTest {
@Autowired
private TestEntityManager entityManager;
@Autowired
private OrderRepository orderRepository;
@Autowired
private EventStoreRepository eventStoreRepository;
private OrderService orderService;
private EventStoreService eventStoreService;
@BeforeEach
void setUp() {
VersionValidationService versionService = new VersionValidationService(orderRepository);
DomainEventPublisher eventPublisher = new DomainEventPublisher();
OptimisticLockRetryTemplate retryTemplate = new OptimisticLockRetryTemplate();
this.orderService = new OrderService(orderRepository, eventPublisher, retryTemplate);
ObjectMapper objectMapper = new ObjectMapper();
EventSerializer eventSerializer = new EventSerializer(objectMapper);
this.eventStoreService = new EventStoreService(eventStoreRepository, objectMapper, eventSerializer);
}
@Test
void testOptimisticLockingPreventsLostUpdates() {
// Given
Order order = orderService.createOrder("customer-123");
String orderId = order.getId().getValue();
Long originalVersion = order.getVersion();
// When - Simulate concurrent updates
CompletableFuture<Order> update1 = CompletableFuture.supplyAsync(() ->
orderService.addOrderItem(orderId, "prod-1", "Product 1", 2, new BigDecimal("29.99")));
CompletableFuture<Order> update2 = CompletableFuture.supplyAsync(() ->
orderService.addOrderItem(orderId, "prod-2", "Product 2", 1, new BigDecimal("49.99")));
// Then - One should succeed, one should retry
CompletableFuture.allOf(update1, update2).join();
Order finalOrder = orderService.getOrder(orderId).orElseThrow();
assertEquals(2, finalOrder.getOrderLines().size());
assertTrue(finalOrder.getVersion() > originalVersion);
}
@Test
void testEventSourcingRebuildsAggregateCorrectly() {
// Given
OrderId orderId = OrderId.generate();
String customerId = "customer-456";
// Create events
List<DomainEvent> events = List.of(
new OrderCreatedEvent(orderId.getValue(), customerId),
new OrderItemAddedEvent(orderId.getValue(), "prod-1", "Product 1", 2, new BigDecimal("25.00")),
new OrderItemAddedEvent(orderId.getValue(), "prod-2", "Product 2", 1, new BigDecimal("50.00")),
new OrderSubmittedEvent(orderId.getValue(), customerId, new BigDecimal("100.00"))
);
// When - Rebuild aggregate from events
EventSourcedOrder order = new EventSourcedOrder(orderId, events);
// Then
assertEquals(OrderStatus.SUBMITTED, order.getStatus());
assertEquals(2, order.getOrderLines().size());
assertEquals(0, new BigDecimal("100.00").compareTo(order.getTotalAmount()));
assertEquals(4L, order.getVersion().longValue());
}
@Test
void testVersionIncrementOnStateChange() {
// Given
Order order = orderService.createOrder("customer-789");
Long initialVersion = order.getVersion();
// When
order.addItem("prod-1", "Product 1", 1, new BigDecimal("10.00"));
// Then
assertEquals(initialVersion + 1, order.getVersion());
}
@Test
void testInvariantValidation() {
// Given
Order order = orderService.createOrder("customer-999");
// When/Then - Adding item with negative quantity should fail
assertThrows(AggregateInvariantViolationException.class, () ->
order.addItem("prod-1", "Product 1", -1, new BigDecimal("10.00")));
}
@Test
void testConcurrentEventSourcing() {
// Given
String aggregateId = "order-concurrent-test";
// When - Simulate concurrent event appends
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
List<DomainEvent> events = List.of(
new OrderCreatedEvent(aggregateId, "customer-1")
);
eventStoreService.saveEvents(aggregateId, "Order", events, 0L);
});
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
// This should fail due to version conflict
assertThrows(ConcurrencyException.class, () -> {
List<DomainEvent> events = List.of(
new OrderItemAddedEvent(aggregateId, "prod-1", "Product 1", 1, new BigDecimal("10.00"))
);
eventStoreService.saveEvents(aggregateId, "Order", events, 0L);
});
});
// Then
CompletableFuture.allOf(future1, future2).join();
}
}
Configuration and Best Practices
1. Application Configuration
# application.yml spring: datasource: url: jdbc:postgresql://localhost:5432/aggregate_versioning username: postgres password: password jpa: hibernate: ddl-auto: update properties: hibernate: dialect: org.hibernate.dialect.PostgreSQLDialect jdbc.batch_size: 50 order_inserts: true order_updates: true generate_statistics: true connection.handling_mode: DELAYED_ACQUISITION_AND_HOLD jackson: property-naming-strategy: SNAKE_CASE default-property-inclusion: NON_NULL logging: level: com.example.aggregate: DEBUG org.hibernate.optimisticlocking: WARN
2. Best Practices Service
@Service
@Slf4j
public class AggregateVersioningBestPractices {
/**
* Best Practices for Aggregate Root Versioning:
*
* 1. Version Increment Strategy:
* - Increment version on every state-changing command
* - Use database triggers for additional safety
* - Consider event versioning for event-sourced aggregates
*
* 2. Concurrency Control:
* - Use optimistic locking for high-concurrency scenarios
* - Implement retry mechanisms with exponential backoff
* - Consider pessimistic locking for critical operations
*
* 3. Event Sourcing:
* - Store events in append-only storage
* - Use event version for conflict detection
* - Implement snapshotting for large aggregates
*
* 4. Validation:
* - Validate invariants before version increment
* - Use domain events for cross-aggregate validation
* - Implement compensation for failed operations
*/
public void demonstrateBestPractices() {
log.info("""
Aggregate Root Versioning Best Practices:
1. Design aggregates with clear boundaries
2. Implement optimistic locking with version numbers
3. Use domain events for side effects
4. Validate business invariants consistently
5. Handle concurrency conflicts gracefully
6. Consider event sourcing for audit requirements
7. Implement proper error handling and compensation
8. Monitor version conflicts and performance
""");
}
}
Monitoring and Metrics
1. Version Conflict Monitoring
@Component
@Slf4j
public class VersionConflictMonitor {
private final MeterRegistry meterRegistry;
private final Counter versionConflictCounter;
private final Timer aggregateLoadTimer;
public VersionConflictMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.versionConflictCounter = meterRegistry.counter("aggregate.version.conflicts");
this.aggregateLoadTimer = meterRegistry.timer("aggregate.load.time");
}
public void recordVersionConflict(String aggregateType, String operation) {
versionConflictCounter.increment();
Tags tags = Tags.of(
Tag.of("aggregate_type", aggregateType),
Tag.of("operation", operation)
);
meterRegistry.counter("aggregate.version.conflicts.detailed", tags).increment();
log.warn("Version conflict detected for {} during {}", aggregateType, operation);
}
public void recordAggregateLoad(String aggregateType, Duration duration) {
aggregateLoadTimer.record(duration);
Tags tags = Tags.of(Tag.of("aggregate_type", aggregateType));
meterRegistry.timer("aggregate.load.time.detailed", tags).record(duration);
}
public void recordSuccessfulOperation(String aggregateType, String operation) {
Tags tags = Tags.of(
Tag.of("aggregate_type", aggregateType),
Tag.of("operation", operation)
);
meterRegistry.counter("aggregate.operations.success", tags).increment();
}
}
Conclusion
Aggregate Root Versioning is essential for maintaining data consistency in concurrent environments. Key takeaways:
Implementation Strategies:
- Optimistic Locking: Best for high-concurrency scenarios with retry mechanisms
- Event Sourcing: Provides complete audit trail and temporal query capabilities
- Pessimistic Locking: Suitable for critical operations requiring guaranteed isolation
Performance Considerations:
- Retry strategies with exponential backoff for optimistic locking conflicts
- Snapshotting for event-sourced aggregates to improve read performance
- Batch operations to reduce version increment frequency
Best Practices:
- Clear aggregate boundaries to minimize concurrent access points
- Comprehensive invariant validation before state changes
- Domain events for side effects and cross-aggregate communication
- Monitoring and metrics to track version conflicts and performance
Use Cases:
- E-commerce systems with concurrent order modifications
- Financial applications requiring audit trails
- Collaborative editing with conflict resolution
- Inventory management with stock level consistency
By implementing proper aggregate root versioning, you can build robust, scalable systems that maintain data integrity while supporting high levels of concurrency.