Axon Framework for CQRS in Java: Complete Guide

Introduction to Axon Framework

Axon Framework is a Java framework that helps build scalable, maintainable applications based on CQRS (Command Query Responsibility Segregation) and Event Sourcing patterns. It provides building blocks for creating applications that separate read and write models while maintaining event-driven architecture.

Table of Contents

  1. Setup and Dependencies
  2. Core Concepts and Architecture
  3. Command Model (Write Side)
  4. Query Model (Read Side)
  5. Event Handling and Projections
  6. Saga Pattern for Complex Workflows
  7. Testing Strategies
  8. Performance Optimization
  9. Real-World Examples

1. Setup and Dependencies

Maven Configuration

<properties>
<axon.version>4.9.0</axon.version>
<spring-boot.version>3.1.0</spring-boot.version>
</properties>
<dependencies>
<!-- Axon Framework -->
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>${axon.version}</version>
</dependency>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!-- Database -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Lombok for reducing boilerplate -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring-boot.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-test</artifactId>
<version>${axon.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

Application Configuration

package com.example.axon.config;
import org.axonframework.eventsourcing.EventCountSnapshotTriggerDefinition;
import org.axonframework.eventsourcing.SnapshotTriggerDefinition;
import org.axonframework.eventsourcing.Snapshotter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AxonConfig {
@Bean
public SnapshotTriggerDefinition userAggregateSnapshotTrigger(Snapshotter snapshotter) {
return new EventCountSnapshotTriggerDefinition(snapshotter, 50);
}
@Bean
public SnapshotTriggerDefinition orderAggregateSnapshotTrigger(Snapshotter snapshotter) {
return new EventCountSnapshotTriggerDefinition(snapshotter, 30);
}
}

application.yml

spring:
datasource:
url: jdbc:h2:mem:axon-db
driverClassName: org.h2.Driver
username: sa
password: 
jpa:
database-platform: org.hibernate.dialect.H2Dialect
hibernate:
ddl-auto: create-drop
show-sql: true
h2:
console:
enabled: true
axon:
axonserver:
servers: localhost:8124
eventhandling:
processors:
user-projector:
mode: tracking
order-projector:
mode: tracking
server:
port: 8080
logging:
level:
org.axonframework: DEBUG
com.example.axon: DEBUG

2. Core Concepts and Architecture

Domain Model Overview

package com.example.axon.coreapi;
// Commands
public record CreateUserCommand(String userId, String username, String email, String fullName) {}
public record ChangeUserEmailCommand(String userId, String newEmail) {}
public record DeleteUserCommand(String userId, String reason) {}
public record CreateOrderCommand(String orderId, String userId, BigDecimal totalAmount) {}
public record AddOrderItemCommand(String orderId, String productId, String productName, int quantity, BigDecimal unitPrice) {}
public record ConfirmOrderCommand(String orderId) {}
public record CancelOrderCommand(String orderId, String reason) {}
// Events
public record UserCreatedEvent(String userId, String username, String email, String fullName) {}
public record UserEmailChangedEvent(String userId, String oldEmail, String newEmail) {}
public record UserDeletedEvent(String userId, String reason) {}
public record OrderCreatedEvent(String orderId, String userId, BigDecimal totalAmount) {}
public record OrderItemAddedEvent(String orderId, String productId, String productName, int quantity, BigDecimal unitPrice) {}
public record OrderConfirmedEvent(String orderId) {}
public record OrderCancelledEvent(String orderId, String reason) {}
// Queries
public record FindUserQuery(String userId) {}
public record FindAllUsersQuery() {}
public record FindUserByEmailQuery(String email) {}
public record FindOrderQuery(String orderId) {}
public record FindOrdersByUserQuery(String userId) {}
public record FindAllOrdersQuery() {}
// Query Results
public record UserResponse(String userId, String username, String email, String fullName, boolean active) {}
public record OrderResponse(String orderId, String userId, BigDecimal totalAmount, String status, List<OrderItemResponse> items) {}
public record OrderItemResponse(String productId, String productName, int quantity, BigDecimal unitPrice) {}

3. Command Model (Write Side)

User Aggregate

package com.example.axon.command.aggregate;
import com.example.axon.coreapi.*;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.modelling.command.AggregateLifecycle;
import org.axonframework.spring.stereotype.Aggregate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.axonframework.modelling.command.AggregateLifecycle.apply;
@Aggregate(snapshotTriggerDefinition = "userAggregateSnapshotTrigger")
public class UserAggregate {
private static final Logger logger = LoggerFactory.getLogger(UserAggregate.class);
@AggregateIdentifier
private String userId;
private String username;
private String email;
private String fullName;
private boolean active;
// Required by Axon
protected UserAggregate() {}
@CommandHandler
public UserAggregate(CreateUserCommand command) {
logger.info("Handling CreateUserCommand: {}", command);
if (command.username() == null || command.username().trim().isEmpty()) {
throw new IllegalArgumentException("Username cannot be null or empty");
}
if (command.email() == null || !command.email().contains("@")) {
throw new IllegalArgumentException("Invalid email address");
}
apply(new UserCreatedEvent(
command.userId(),
command.username(),
command.email(),
command.fullName()
));
}
@CommandHandler
public void handle(ChangeUserEmailCommand command) {
logger.info("Handling ChangeUserEmailCommand: {}", command);
if (!active) {
throw new IllegalStateException("Cannot change email for inactive user");
}
if (command.newEmail() == null || !command.newEmail().contains("@")) {
throw new IllegalArgumentException("Invalid email address");
}
apply(new UserEmailChangedEvent(
userId,
this.email,
command.newEmail()
));
}
@CommandHandler
public void handle(DeleteUserCommand command) {
logger.info("Handling DeleteUserCommand: {}", command);
if (!active) {
throw new IllegalStateException("User already deleted");
}
apply(new UserDeletedEvent(userId, command.reason()));
}
@EventSourcingHandler
public void on(UserCreatedEvent event) {
logger.info("Applying UserCreatedEvent: {}", event);
this.userId = event.userId();
this.username = event.username();
this.email = event.email();
this.fullName = event.fullName();
this.active = true;
}
@EventSourcingHandler
public void on(UserEmailChangedEvent event) {
logger.info("Applying UserEmailChangedEvent: {}", event);
this.email = event.newEmail();
}
@EventSourcingHandler
public void on(UserDeletedEvent event) {
logger.info("Applying UserDeletedEvent: {}", event);
this.active = false;
}
// Getters
public String getUserId() { return userId; }
public String getUsername() { return username; }
public String getEmail() { return email; }
public String getFullName() { return fullName; }
public boolean isActive() { return active; }
}

Order Aggregate

package com.example.axon.command.aggregate;
import com.example.axon.coreapi.*;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.modelling.command.AggregateMember;
import org.axonframework.spring.stereotype.Aggregate;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import static org.axonframework.modelling.command.AggregateLifecycle.apply;
@Aggregate(snapshotTriggerDefinition = "orderAggregateSnapshotTrigger")
public class OrderAggregate {
@AggregateIdentifier
private String orderId;
private String userId;
private BigDecimal totalAmount;
private OrderStatus status;
@AggregateMember
private List<OrderItem> items;
public enum OrderStatus {
CREATED, CONFIRMED, CANCELLED, SHIPPED, DELIVERED
}
protected OrderAggregate() {
this.items = new ArrayList<>();
}
@CommandHandler
public OrderAggregate(CreateOrderCommand command) {
apply(new OrderCreatedEvent(
command.orderId(),
command.userId(),
command.totalAmount()
));
}
@CommandHandler
public void handle(AddOrderItemCommand command) {
if (status != OrderStatus.CREATED) {
throw new IllegalStateException("Cannot add items to order in status: " + status);
}
apply(new OrderItemAddedEvent(
orderId,
command.productId(),
command.productName(),
command.quantity(),
command.unitPrice()
));
}
@CommandHandler
public void handle(ConfirmOrderCommand command) {
if (status != OrderStatus.CREATED) {
throw new IllegalStateException("Order already confirmed or cancelled");
}
if (items.isEmpty()) {
throw new IllegalStateException("Cannot confirm empty order");
}
apply(new OrderConfirmedEvent(orderId));
}
@CommandHandler
public void handle(CancelOrderCommand command) {
if (status != OrderStatus.CREATED) {
throw new IllegalStateException("Cannot cancel order in status: " + status);
}
apply(new OrderCancelledEvent(orderId, command.reason()));
}
@EventSourcingHandler
public void on(OrderCreatedEvent event) {
this.orderId = event.orderId();
this.userId = event.userId();
this.totalAmount = event.totalAmount();
this.status = OrderStatus.CREATED;
this.items = new ArrayList<>();
}
@EventSourcingHandler
public void on(OrderItemAddedEvent event) {
OrderItem newItem = new OrderItem(
event.productId(),
event.productName(),
event.quantity(),
event.unitPrice()
);
this.items.add(newItem);
// Recalculate total amount
this.totalAmount = items.stream()
.map(OrderItem::getTotalPrice)
.reduce(BigDecimal.ZERO, BigDecimal::add);
}
@EventSourcingHandler
public void on(OrderConfirmedEvent event) {
this.status = OrderStatus.CONFIRMED;
}
@EventSourcingHandler
public void on(OrderCancelledEvent event) {
this.status = OrderStatus.CANCELLED;
}
// Getters
public String getOrderId() { return orderId; }
public String getUserId() { return userId; }
public BigDecimal getTotalAmount() { return totalAmount; }
public OrderStatus getStatus() { return status; }
public List<OrderItem> getItems() { return new ArrayList<>(items); }
// Order Item Entity
public static class OrderItem {
private String productId;
private String productName;
private int quantity;
private BigDecimal unitPrice;
public OrderItem(String productId, String productName, int quantity, BigDecimal unitPrice) {
this.productId = productId;
this.productName = productName;
this.quantity = quantity;
this.unitPrice = unitPrice;
}
public BigDecimal getTotalPrice() {
return unitPrice.multiply(BigDecimal.valueOf(quantity));
}
// Getters
public String getProductId() { return productId; }
public String getProductName() { return productName; }
public int getQuantity() { return quantity; }
public BigDecimal getUnitPrice() { return unitPrice; }
}
}

4. Query Model (Read Side)

User Projection (Read Model)

package com.example.axon.query.projection;
import com.example.axon.coreapi.*;
import com.example.axon.query.entity.User;
import com.example.axon.query.repository.UserRepository;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@Component
public class UserProjection {
private static final Logger logger = LoggerFactory.getLogger(UserProjection.class);
private final UserRepository userRepository;
public UserProjection(UserRepository userRepository) {
this.userRepository = userRepository;
}
@EventHandler
public void on(UserCreatedEvent event) {
logger.info("Projecting UserCreatedEvent: {}", event);
User user = new User(
event.userId(),
event.username(),
event.email(),
event.fullName(),
true
);
userRepository.save(user);
}
@EventHandler
public void on(UserEmailChangedEvent event) {
logger.info("Projecting UserEmailChangedEvent: {}", event);
userRepository.findById(event.userId())
.ifPresent(user -> {
user.setEmail(event.newEmail());
userRepository.save(user);
});
}
@EventHandler
public void on(UserDeletedEvent event) {
logger.info("Projecting UserDeletedEvent: {}", event);
userRepository.findById(event.userId())
.ifPresent(user -> {
user.setActive(false);
userRepository.save(user);
});
}
@QueryHandler
public UserResponse handle(FindUserQuery query) {
logger.info("Handling FindUserQuery: {}", query);
return userRepository.findById(query.userId())
.map(this::toUserResponse)
.orElse(null);
}
@QueryHandler
public List<UserResponse> handle(FindAllUsersQuery query) {
logger.info("Handling FindAllUsersQuery");
return userRepository.findAll().stream()
.map(this::toUserResponse)
.collect(Collectors.toList());
}
@QueryHandler
public UserResponse handle(FindUserByEmailQuery query) {
logger.info("Handling FindUserByEmailQuery: {}", query);
return userRepository.findByEmail(query.email())
.map(this::toUserResponse)
.orElse(null);
}
private UserResponse toUserResponse(User user) {
return new UserResponse(
user.getUserId(),
user.getUsername(),
user.getEmail(),
user.getFullName(),
user.isActive()
);
}
}

Order Projection (Read Model)

package com.example.axon.query.projection;
import com.example.axon.coreapi.*;
import com.example.axon.query.entity.Order;
import com.example.axon.query.entity.OrderItem;
import com.example.axon.query.repository.OrderRepository;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.stream.Collectors;
@Component
public class OrderProjection {
private static final Logger logger = LoggerFactory.getLogger(OrderProjection.class);
private final OrderRepository orderRepository;
public OrderProjection(OrderRepository orderRepository) {
this.orderRepository = orderRepository;
}
@EventHandler
public void on(OrderCreatedEvent event) {
logger.info("Projecting OrderCreatedEvent: {}", event);
Order order = new Order(
event.orderId(),
event.userId(),
event.totalAmount(),
Order.Status.CREATED
);
orderRepository.save(order);
}
@EventHandler
public void on(OrderItemAddedEvent event) {
logger.info("Projecting OrderItemAddedEvent: {}", event);
orderRepository.findById(event.orderId())
.ifPresent(order -> {
OrderItem item = new OrderItem(
event.productId(),
event.productName(),
event.quantity(),
event.unitPrice()
);
order.addItem(item);
orderRepository.save(order);
});
}
@EventHandler
public void on(OrderConfirmedEvent event) {
logger.info("Projecting OrderConfirmedEvent: {}", event);
orderRepository.findById(event.orderId())
.ifPresent(order -> {
order.setStatus(Order.Status.CONFIRMED);
orderRepository.save(order);
});
}
@EventHandler
public void on(OrderCancelledEvent event) {
logger.info("Projecting OrderCancelledEvent: {}", event);
orderRepository.findById(event.orderId())
.ifPresent(order -> {
order.setStatus(Order.Status.CANCELLED);
orderRepository.save(order);
});
}
@QueryHandler
public OrderResponse handle(FindOrderQuery query) {
logger.info("Handling FindOrderQuery: {}", query);
return orderRepository.findById(query.orderId())
.map(this::toOrderResponse)
.orElse(null);
}
@QueryHandler
public List<OrderResponse> handle(FindOrdersByUserQuery query) {
logger.info("Handling FindOrdersByUserQuery: {}", query);
return orderRepository.findByUserId(query.userId()).stream()
.map(this::toOrderResponse)
.collect(Collectors.toList());
}
@QueryHandler
public List<OrderResponse> handle(FindAllOrdersQuery query) {
logger.info("Handling FindAllOrdersQuery");
return orderRepository.findAll().stream()
.map(this::toOrderResponse)
.collect(Collectors.toList());
}
private OrderResponse toOrderResponse(Order order) {
List<OrderItemResponse> itemResponses = order.getItems().stream()
.map(item -> new OrderItemResponse(
item.getProductId(),
item.getProductName(),
item.getQuantity(),
item.getUnitPrice()
))
.collect(Collectors.toList());
return new OrderResponse(
order.getOrderId(),
order.getUserId(),
order.getTotalAmount(),
order.getStatus().name(),
itemResponses
);
}
}

JPA Entities and Repositories

package com.example.axon.query.entity;
import jakarta.persistence.*;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
@Entity
@Table(name = "users")
public class User {
@Id
private String userId;
@Column(nullable = false, unique = true)
private String username;
@Column(nullable = false, unique = true)
private String email;
@Column(nullable = false)
private String fullName;
@Column(nullable = false)
private boolean active;
// Required by JPA
protected User() {}
public User(String userId, String username, String email, String fullName, boolean active) {
this.userId = userId;
this.username = username;
this.email = email;
this.fullName = fullName;
this.active = active;
}
// Getters and setters
public String getUserId() { return userId; }
public String getUsername() { return username; }
public String getEmail() { return email; }
public String getFullName() { return fullName; }
public boolean isActive() { return active; }
public void setEmail(String email) { this.email = email; }
public void setActive(boolean active) { this.active = active; }
}
@Entity
@Table(name = "orders")
public class Order {
@Id
private String orderId;
@Column(nullable = false)
private String userId;
@Column(nullable = false)
private BigDecimal totalAmount;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private Status status;
@OneToMany(cascade = CascadeType.ALL, fetch = FetchType.EAGER)
@JoinColumn(name = "order_id")
private List<OrderItem> items = new ArrayList<>();
public enum Status {
CREATED, CONFIRMED, CANCELLED, SHIPPED, DELIVERED
}
protected Order() {}
public Order(String orderId, String userId, BigDecimal totalAmount, Status status) {
this.orderId = orderId;
this.userId = userId;
this.totalAmount = totalAmount;
this.status = status;
}
public void addItem(OrderItem item) {
this.items.add(item);
// Recalculate total in real application
}
// Getters and setters
public String getOrderId() { return orderId; }
public String getUserId() { return userId; }
public BigDecimal getTotalAmount() { return totalAmount; }
public Status getStatus() { return status; }
public List<OrderItem> getItems() { return items; }
public void setStatus(Status status) { this.status = status; }
public void setTotalAmount(BigDecimal totalAmount) { this.totalAmount = totalAmount; }
}
@Entity
@Table(name = "order_items")
public class OrderItem {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String productId;
private String productName;
private int quantity;
private BigDecimal unitPrice;
protected OrderItem() {}
public OrderItem(String productId, String productName, int quantity, BigDecimal unitPrice) {
this.productId = productId;
this.productName = productName;
this.quantity = quantity;
this.unitPrice = unitPrice;
}
// Getters
public Long getId() { return id; }
public String getProductId() { return productId; }
public String getProductName() { return productName; }
public int getQuantity() { return quantity; }
public BigDecimal getUnitPrice() { return unitPrice; }
}
package com.example.axon.query.repository;
import com.example.axon.query.entity.User;
import com.example.axon.query.entity.Order;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import java.util.List;
import java.util.Optional;
@Repository
public interface UserRepository extends JpaRepository<User, String> {
Optional<User> findByEmail(String email);
List<User> findByActiveTrue();
}
@Repository
public interface OrderRepository extends JpaRepository<Order, String> {
List<Order> findByUserId(String userId);
List<Order> findByStatus(Order.Status status);
}

5. Command and Query Controllers

REST Controllers

package com.example.axon.web;
import com.example.axon.coreapi.*;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.messaging.responsetypes.ResponseTypes;
import org.axonframework.queryhandling.QueryGateway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.math.BigDecimal;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@RestController
@RequestMapping("/api/users")
public class UserController {
private static final Logger logger = LoggerFactory.getLogger(UserController.class);
private final CommandGateway commandGateway;
private final QueryGateway queryGateway;
public UserController(CommandGateway commandGateway, QueryGateway queryGateway) {
this.commandGateway = commandGateway;
this.queryGateway = queryGateway;
}
@PostMapping
public CompletableFuture<String> createUser(@RequestBody CreateUserRequest request) {
String userId = "USER-" + UUID.randomUUID();
CreateUserCommand command = new CreateUserCommand(
userId,
request.username(),
request.email(),
request.fullName()
);
return commandGateway.send(command)
.thenApply(result -> userId);
}
@PutMapping("/{userId}/email")
public CompletableFuture<ResponseEntity<Void>> changeEmail(
@PathVariable String userId,
@RequestBody ChangeEmailRequest request) {
ChangeUserEmailCommand command = new ChangeUserEmailCommand(
userId,
request.newEmail()
);
return commandGateway.send(command)
.thenApply(result -> ResponseEntity.ok().build())
.exceptionally(throwable -> ResponseEntity.badRequest().build());
}
@DeleteMapping("/{userId}")
public CompletableFuture<ResponseEntity<Void>> deleteUser(
@PathVariable String userId,
@RequestBody DeleteUserRequest request) {
DeleteUserCommand command = new DeleteUserCommand(
userId,
request.reason()
);
return commandGateway.send(command)
.thenApply(result -> ResponseEntity.ok().build());
}
@GetMapping("/{userId}")
public CompletableFuture<UserResponse> getUser(@PathVariable String userId) {
return queryGateway.query(
new FindUserQuery(userId),
UserResponse.class
);
}
@GetMapping
public CompletableFuture<List<UserResponse>> getAllUsers() {
return queryGateway.query(
new FindAllUsersQuery(),
ResponseTypes.multipleInstancesOf(UserResponse.class)
);
}
@GetMapping("/search")
public CompletableFuture<UserResponse> getUserByEmail(@RequestParam String email) {
return queryGateway.query(
new FindUserByEmailQuery(email),
UserResponse.class
);
}
// Request DTOs
public record CreateUserRequest(String username, String email, String fullName) {}
public record ChangeEmailRequest(String newEmail) {}
public record DeleteUserRequest(String reason) {}
}
@RestController
@RequestMapping("/api/orders")
public class OrderController {
private static final Logger logger = LoggerFactory.getLogger(OrderController.class);
private final CommandGateway commandGateway;
private final QueryGateway queryGateway;
public OrderController(CommandGateway commandGateway, QueryGateway queryGateway) {
this.commandGateway = commandGateway;
this.queryGateway = queryGateway;
}
@PostMapping
public CompletableFuture<String> createOrder(@RequestBody CreateOrderRequest request) {
String orderId = "ORDER-" + UUID.randomUUID();
CreateOrderCommand command = new CreateOrderCommand(
orderId,
request.userId(),
request.totalAmount()
);
return commandGateway.send(command)
.thenApply(result -> orderId);
}
@PostMapping("/{orderId}/items")
public CompletableFuture<ResponseEntity<Void>> addOrderItem(
@PathVariable String orderId,
@RequestBody AddOrderItemRequest request) {
AddOrderItemCommand command = new AddOrderItemCommand(
orderId,
request.productId(),
request.productName(),
request.quantity(),
request.unitPrice()
);
return commandGateway.send(command)
.thenApply(result -> ResponseEntity.ok().build());
}
@PostMapping("/{orderId}/confirm")
public CompletableFuture<ResponseEntity<Void>> confirmOrder(@PathVariable String orderId) {
ConfirmOrderCommand command = new ConfirmOrderCommand(orderId);
return commandGateway.send(command)
.thenApply(result -> ResponseEntity.ok().build());
}
@PostMapping("/{orderId}/cancel")
public CompletableFuture<ResponseEntity<Void>> cancelOrder(
@PathVariable String orderId,
@RequestBody CancelOrderRequest request) {
CancelOrderCommand command = new CancelOrderCommand(
orderId,
request.reason()
);
return commandGateway.send(command)
.thenApply(result -> ResponseEntity.ok().build());
}
@GetMapping("/{orderId}")
public CompletableFuture<OrderResponse> getOrder(@PathVariable String orderId) {
return queryGateway.query(
new FindOrderQuery(orderId),
OrderResponse.class
);
}
@GetMapping
public CompletableFuture<List<OrderResponse>> getOrdersByUser(@RequestParam String userId) {
return queryGateway.query(
new FindOrdersByUserQuery(userId),
ResponseTypes.multipleInstancesOf(OrderResponse.class)
);
}
@GetMapping("/all")
public CompletableFuture<List<OrderResponse>> getAllOrders() {
return queryGateway.query(
new FindAllOrdersQuery(),
ResponseTypes.multipleInstancesOf(OrderResponse.class)
);
}
// Request DTOs
public record CreateOrderRequest(String userId, BigDecimal totalAmount) {}
public record AddOrderItemRequest(String productId, String productName, int quantity, BigDecimal unitPrice) {}
public record CancelOrderRequest(String reason) {}
}

6. Saga Pattern for Complex Workflows

Order Processing Saga

package com.example.axon.saga;
import com.example.axon.coreapi.*;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.modelling.saga.EndSaga;
import org.axonframework.modelling.saga.SagaEventHandler;
import org.axonframework.modelling.saga.StartSaga;
import org.axonframework.spring.stereotype.Saga;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.UUID;
@Saga
public class OrderProcessingSaga {
private static final Logger logger = LoggerFactory.getLogger(OrderProcessingSaga.class);
@Autowired
private transient CommandGateway commandGateway;
private String orderId;
private String userId;
private boolean paymentProcessed;
private boolean inventoryReserved;
@StartSaga
@SagaEventHandler(associationProperty = "orderId")
public void handle(OrderConfirmedEvent event) {
logger.info("Starting OrderProcessingSaga for order: {}", event.orderId());
this.orderId = event.orderId();
// Step 1: Process payment
String paymentId = "PAYMENT-" + UUID.randomUUID();
commandGateway.send(new ProcessPaymentCommand(paymentId, orderId, userId));
// Step 2: Reserve inventory
commandGateway.send(new ReserveInventoryCommand(orderId));
}
@SagaEventHandler(associationProperty = "orderId")
public void handle(PaymentProcessedEvent event) {
logger.info("Payment processed for order: {}", orderId);
this.paymentProcessed = true;
checkIfReadyToShip();
}
@SagaEventHandler(associationProperty = "orderId")
public void handle(InventoryReservedEvent event) {
logger.info("Inventory reserved for order: {}", orderId);
this.inventoryReserved = true;
checkIfReadyToShip();
}
@SagaEventHandler(associationProperty = "orderId")
public void handle(PaymentFailedEvent event) {
logger.error("Payment failed for order: {}", orderId);
// Compensating action: cancel the order
commandGateway.send(new CancelOrderCommand(orderId, "Payment failed"));
endSaga();
}
@SagaEventHandler(associationProperty = "orderId")
public void handle(InventoryReservationFailedEvent event) {
logger.error("Inventory reservation failed for order: {}", orderId);
// Compensating action: refund payment and cancel order
commandGateway.send(new RefundPaymentCommand(orderId));
commandGateway.send(new CancelOrderCommand(orderId, "Inventory reservation failed"));
endSaga();
}
@EndSaga
@SagaEventHandler(associationProperty = "orderId")
public void handle(OrderShippedEvent event) {
logger.info("Order shipped successfully: {}", orderId);
// Saga completed successfully
}
@EndSaga
@SagaEventHandler(associationProperty = "orderId")
public void handle(OrderCancelledEvent event) {
logger.info("Order cancelled, ending saga: {}", orderId);
// Saga ended due to cancellation
}
private void checkIfReadyToShip() {
if (paymentProcessed && inventoryReserved) {
// Both steps completed successfully, ship the order
commandGateway.send(new ShipOrderCommand(orderId));
}
}
private void endSaga() {
// Axon will end the saga automatically when @EndSaga is used
}
}

Additional Commands and Events for Saga

package com.example.axon.coreapi;
// Additional commands for saga
public record ProcessPaymentCommand(String paymentId, String orderId, String userId) {}
public record ReserveInventoryCommand(String orderId) {}
public record RefundPaymentCommand(String orderId) {}
public record ShipOrderCommand(String orderId) {}
// Additional events for saga
public record PaymentProcessedEvent(String paymentId, String orderId) {}
public record PaymentFailedEvent(String paymentId, String orderId, String reason) {}
public record InventoryReservedEvent(String orderId) {}
public record InventoryReservationFailedEvent(String orderId, String reason) {}
public record OrderShippedEvent(String orderId) {}

7. Testing Strategies

Aggregate Testing

package com.example.axon.test;
import com.example.axon.command.aggregate.UserAggregate;
import com.example.axon.coreapi.*;
import org.axonframework.test.aggregate.AggregateTestFixture;
import org.axonframework.test.aggregate.FixtureConfiguration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.UUID;
class UserAggregateTest {
private FixtureConfiguration<UserAggregate> fixture;
private String userId;
@BeforeEach
void setUp() {
fixture = new AggregateTestFixture<>(UserAggregate.class);
userId = "USER-" + UUID.randomUUID();
}
@Test
void testCreateUser() {
fixture.givenNoPriorActivity()
.when(new CreateUserCommand(userId, "john_doe", "[email protected]", "John Doe"))
.expectEvents(new UserCreatedEvent(userId, "john_doe", "[email protected]", "John Doe"))
.expectSuccessfulHandlerExecution();
}
@Test
void testChangeUserEmail() {
fixture.given(new UserCreatedEvent(userId, "john_doe", "[email protected]", "John Doe"))
.when(new ChangeUserEmailCommand(userId, "[email protected]"))
.expectEvents(new UserEmailChangedEvent(userId, "[email protected]", "[email protected]"))
.expectSuccessfulHandlerExecution();
}
@Test
void testChangeEmailForInactiveUser() {
fixture.given(
new UserCreatedEvent(userId, "john_doe", "[email protected]", "John Doe"),
new UserDeletedEvent(userId, "User requested deletion")
)
.when(new ChangeUserEmailCommand(userId, "[email protected]"))
.expectException(IllegalStateException.class)
.expectNoEvents();
}
@Test
void testDeleteUser() {
fixture.given(new UserCreatedEvent(userId, "john_doe", "[email protected]", "John Doe"))
.when(new DeleteUserCommand(userId, "User requested deletion"))
.expectEvents(new UserDeletedEvent(userId, "User requested deletion"))
.expectSuccessfulHandlerExecution();
}
}

Saga Testing

package com.example.axon.test;
import com.example.axon.coreapi.*;
import com.example.axon.saga.OrderProcessingSaga;
import org.axonframework.test.saga.SagaTestFixture;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class OrderProcessingSagaTest {
private SagaTestFixture<OrderProcessingSaga> fixture;
private String orderId;
private String userId;
@BeforeEach
void setUp() {
fixture = new SagaTestFixture<>(OrderProcessingSaga.class);
orderId = "ORDER-123";
userId = "USER-456";
}
@Test
void testOrderProcessingSagaHappyPath() {
fixture.givenNoPriorActivity()
.whenAggregate(orderId).publishes(new OrderConfirmedEvent(orderId))
.expectActiveSagas(1)
.expectDispatchedCommands(
new ProcessPaymentCommand(/* matchers */),
new ReserveInventoryCommand(orderId)
)
.whenAggregate(orderId).publishes(new PaymentProcessedEvent("PAY-789", orderId))
.whenAggregate(orderId).publishes(new InventoryReservedEvent(orderId))
.expectDispatchedCommands(new ShipOrderCommand(orderId))
.whenAggregate(orderId).publishes(new OrderShippedEvent(orderId))
.expectNoDispatchedCommands()
.expectSagaEnded();
}
@Test
void testOrderProcessingSagaPaymentFailed() {
fixture.givenAggregate(orderId).published(new OrderConfirmedEvent(orderId))
.whenAggregate(orderId).publishes(new PaymentFailedEvent("PAY-789", orderId, "Insufficient funds"))
.expectDispatchedCommands(new CancelOrderCommand(orderId, "Payment failed"))
.expectSagaEnded();
}
}

Integration Testing

package com.example.axon.test;
import com.example.axon.coreapi.*;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.queryhandling.QueryGateway;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.jdbc.Sql;
import java.math.BigDecimal;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
@Sql(statements = "DELETE FROM users; DELETE FROM orders;")
class AxonIntegrationTest {
@Autowired
private CommandGateway commandGateway;
@Autowired
private QueryGateway queryGateway;
@Test
void testCompleteUserWorkflow() throws Exception {
// Create user
String userId = "USER-TEST-1";
CreateUserCommand createCommand = new CreateUserCommand(
userId, "testuser", "[email protected]", "Test User"
);
commandGateway.send(createCommand).get(5, TimeUnit.SECONDS);
// Query user
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
CompletableFuture<UserResponse> userFuture = queryGateway.query(
new FindUserQuery(userId), UserResponse.class
);
UserResponse user = userFuture.get(5, TimeUnit.SECONDS);
assertNotNull(user);
assertEquals("testuser", user.username());
assertEquals("[email protected]", user.email());
});
// Change email
ChangeUserEmailCommand changeEmailCommand = new ChangeUserEmailCommand(
userId, "[email protected]"
);
commandGateway.send(changeEmailCommand).get(5, TimeUnit.SECONDS);
// Verify email change
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
CompletableFuture<UserResponse> userFuture = queryGateway.query(
new FindUserQuery(userId), UserResponse.class
);
UserResponse user = userFuture.get(5, TimeUnit.SECONDS);
assertEquals("[email protected]", user.email());
});
}
@Test
void testCompleteOrderWorkflow() throws Exception {
String orderId = "ORDER-TEST-1";
String userId = "USER-TEST-2";
// Create user first
commandGateway.send(new CreateUserCommand(userId, "orderuser", "[email protected]", "Order User"))
.get(5, TimeUnit.SECONDS);
// Create order
commandGateway.send(new CreateOrderCommand(orderId, userId, BigDecimal.valueOf(100.0)))
.get(5, TimeUnit.SECONDS);
// Add order item
commandGateway.send(new AddOrderItemCommand(orderId, "PROD-1", "Test Product", 2, BigDecimal.valueOf(25.0)))
.get(5, TimeUnit.SECONDS);
// Confirm order
commandGateway.send(new ConfirmOrderCommand(orderId))
.get(5, TimeUnit.SECONDS);
// Verify order
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
CompletableFuture<OrderResponse> orderFuture = queryGateway.query(
new FindOrderQuery(orderId), OrderResponse.class
);
OrderResponse order = orderFuture.get(5, TimeUnit.SECONDS);
assertNotNull(order);
assertEquals("CONFIRMED", order.status());
assertEquals(1, order.items().size());
});
}
}

8. Performance Optimization

Snapshot Configuration

package com.example.axon.config;
import org.axonframework.eventsourcing.EventCountSnapshotTriggerDefinition;
import org.axonframework.eventsourcing.SnapshotTriggerDefinition;
import org.axonframework.eventsourcing.Snapshotter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SnapshotConfig {
@Bean
public SnapshotTriggerDefinition userSnapshotTrigger(Snapshotter snapshotter) {
// Take snapshot every 50 events
return new EventCountSnapshotTriggerDefinition(snapshotter, 50);
}
@Bean
public SnapshotTriggerDefinition orderSnapshotTrigger(Snapshotter snapshotter) {
// Take snapshot every 30 events (orders have more frequent changes)
return new EventCountSnapshotTriggerDefinition(snapshotter, 30);
}
}

Event Store Configuration

package com.example.axon.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.axonframework.serialization.json.JacksonSerializer;
import org.axonframework.springboot.util.jpa.ContainerManagedEntityManagerProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.orm.jpa.JpaTransactionManager;
import javax.persistence.EntityManagerFactory;
@Configuration
public class EventStoreConfig {
@Bean
public JacksonSerializer axonJsonSerializer(ObjectMapper objectMapper) {
return JacksonSerializer.builder()
.objectMapper(objectMapper)
.build();
}
@Bean
public ContainerManagedEntityManagerProvider containerManagedEntityManagerProvider() {
return new ContainerManagedEntityManagerProvider();
}
@Bean
public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
return new JpaTransactionManager(entityManagerFactory);
}
}

9. Real-World Application

Spring Boot Application

package com.example.axon;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class AxonCqrsApplication {
public static void main(String[] args) {
SpringApplication.run(AxonCqrsApplication.class, args);
}
}

Database Migration (Optional)

-- src/main/resources/schema.sql
CREATE TABLE IF NOT EXISTS users (
user_id VARCHAR(255) PRIMARY KEY,
username VARCHAR(100) NOT NULL UNIQUE,
email VARCHAR(255) NOT NULL UNIQUE,
full_name VARCHAR(255) NOT NULL,
active BOOLEAN NOT NULL DEFAULT true
);
CREATE TABLE IF NOT EXISTS orders (
order_id VARCHAR(255) PRIMARY KEY,
user_id VARCHAR(255) NOT NULL,
total_amount DECIMAL(10,2) NOT NULL,
status VARCHAR(50) NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(user_id)
);
CREATE TABLE IF NOT EXISTS order_items (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
order_id VARCHAR(255) NOT NULL,
product_id VARCHAR(255) NOT NULL,
product_name VARCHAR(255) NOT NULL,
quantity INT NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
FOREIGN KEY (order_id) REFERENCES orders(order_id)
);
CREATE INDEX idx_orders_user_id ON orders(user_id);
CREATE INDEX idx_orders_status ON orders(status);

Summary

This comprehensive Axon Framework implementation provides:

Key Benefits:

  • CQRS Separation: Clear separation between command and query responsibilities
  • Event Sourcing: Full audit trail and temporal querying capabilities
  • Scalability: Independent scaling of read and write sides
  • Maintainability: Clean architecture with well-defined boundaries
  • Testability: Comprehensive testing support at all levels

Core Components:

  1. Aggregates: Command-handling entities with business logic
  2. Projections: Event-handling components that build read models
  3. Sagas: Long-running processes for complex workflows
  4. Controllers: REST endpoints for commands and queries
  5. Repositories: Data access for read models

Production Considerations:

  • Use Axon Server for production event storage and dispatching
  • Implement proper error handling and compensation in sagas
  • Configure appropriate snapshot strategies
  • Monitor performance and optimize projections
  • Implement security and validation at appropriate layers

This implementation provides a solid foundation for building scalable, maintainable CQRS applications using Axon Framework.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper