CQRS Pattern Implementation in Java

CQRS (Command Query Responsibility Segregation) is an architectural pattern that separates read and write operations into different models. This separation allows for optimized read and write paths, improved scalability, and better maintainability.

Core Concepts

  • Command: Mutates state (write operations)
  • Query: Reads state without modifying it (read operations)
  • Command Handler: Processes commands and updates write model
  • Query Handler: Processes queries and returns data from read model
  • Event Sourcing (optional): Stores state changes as events

Basic CQRS Implementation

Example 1: Basic CQRS Structure

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
// Base interfaces
interface Command {
String getAggregateId();
}
interface Query {
}
interface CommandHandler<C extends Command> {
CompletableFuture<Void> handle(C command);
}
interface QueryHandler<Q extends Query, R> {
CompletableFuture<R> handle(Q query);
}
// Command and Query buses
class CommandBus {
private Map<Class<? extends Command>, CommandHandler<?>> handlers = new HashMap<>();
@SuppressWarnings("unchecked")
public <C extends Command> void registerHandler(Class<C> commandType, CommandHandler<C> handler) {
handlers.put(commandType, handler);
}
@SuppressWarnings("unchecked")
public <C extends Command> CompletableFuture<Void> dispatch(C command) {
CommandHandler<C> handler = (CommandHandler<C>) handlers.get(command.getClass());
if (handler == null) {
return CompletableFuture.failedFuture(
new RuntimeException("No handler registered for command: " + command.getClass())
);
}
return handler.handle(command);
}
}
class QueryBus {
private Map<Class<? extends Query>, QueryHandler<?, ?>> handlers = new HashMap<>();
@SuppressWarnings("unchecked")
public <Q extends Query, R> void registerHandler(Class<Q> queryType, QueryHandler<Q, R> handler) {
handlers.put(queryType, handler);
}
@SuppressWarnings("unchecked")
public <Q extends Query, R> CompletableFuture<R> dispatch(Q query) {
QueryHandler<Q, R> handler = (QueryHandler<Q, R>) handlers.get(query.getClass());
if (handler == null) {
return CompletableFuture.failedFuture(
new RuntimeException("No handler registered for query: " + query.getClass())
);
}
return handler.handle(query);
}
}

Example 2: User Management with CQRS

import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
// Commands
class CreateUserCommand implements Command {
private final String userId;
private final String username;
private final String email;
private final String password;
public CreateUserCommand(String userId, String username, String email, String password) {
this.userId = userId;
this.username = username;
this.email = email;
this.password = password;
}
@Override public String getAggregateId() { return userId; }
public String getUserId() { return userId; }
public String getUsername() { return username; }
public String getEmail() { return email; }
public String getPassword() { return password; }
}
class UpdateUserEmailCommand implements Command {
private final String userId;
private final String newEmail;
public UpdateUserEmailCommand(String userId, String newEmail) {
this.userId = userId;
this.newEmail = newEmail;
}
@Override public String getAggregateId() { return userId; }
public String getUserId() { return userId; }
public String getNewEmail() { return newEmail; }
}
class DeactivateUserCommand implements Command {
private final String userId;
public DeactivateUserCommand(String userId) {
this.userId = userId;
}
@Override public String getAggregateId() { return userId; }
public String getUserId() { return userId; }
}
// Queries
class GetUserByIdQuery implements Query {
private final String userId;
public GetUserByIdQuery(String userId) {
this.userId = userId;
}
public String getUserId() { return userId; }
}
class GetUserByEmailQuery implements Query {
private final String email;
public GetUserByEmailQuery(String email) {
this.email = email;
}
public String getEmail() { return email; }
}
class GetAllUsersQuery implements Query {
private final boolean includeInactive;
public GetAllUsersQuery(boolean includeInactive) {
this.includeInactive = includeInactive;
}
public boolean isIncludeInactive() { return includeInactive; }
}
// Read Model
class UserView {
private final String userId;
private final String username;
private final String email;
private final boolean active;
private final LocalDateTime createdAt;
private final LocalDateTime updatedAt;
public UserView(String userId, String username, String email, boolean active, 
LocalDateTime createdAt, LocalDateTime updatedAt) {
this.userId = userId;
this.username = username;
this.email = email;
this.active = active;
this.createdAt = createdAt;
this.updatedAt = updatedAt;
}
// Getters
public String getUserId() { return userId; }
public String getUsername() { return username; }
public String getEmail() { return email; }
public boolean isActive() { return active; }
public LocalDateTime getCreatedAt() { return createdAt; }
public LocalDateTime getUpdatedAt() { return updatedAt; }
@Override
public String toString() {
return String.format("UserView{id=%s, username=%s, email=%s, active=%s}", 
userId, username, email, active);
}
}
// Write Model (Aggregate)
class UserAggregate {
private String userId;
private String username;
private String email;
private String passwordHash;
private boolean active;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
public UserAggregate(String userId, String username, String email, String passwordHash) {
this.userId = userId;
this.username = username;
this.email = email;
this.passwordHash = passwordHash;
this.active = true;
this.createdAt = LocalDateTime.now();
this.updatedAt = LocalDateTime.now();
}
public void updateEmail(String newEmail) {
if (!this.email.equals(newEmail)) {
this.email = newEmail;
this.updatedAt = LocalDateTime.now();
}
}
public void deactivate() {
if (this.active) {
this.active = false;
this.updatedAt = LocalDateTime.now();
}
}
// Getters
public String getUserId() { return userId; }
public String getUsername() { return username; }
public String getEmail() { return email; }
public String getPasswordHash() { return passwordHash; }
public boolean isActive() { return active; }
public LocalDateTime getCreatedAt() { return createdAt; }
public LocalDateTime getUpdatedAt() { return updatedAt; }
public UserView toView() {
return new UserView(userId, username, email, active, createdAt, updatedAt);
}
}
// Command Handlers
class CreateUserCommandHandler implements CommandHandler<CreateUserCommand> {
private final UserWriteRepository writeRepository;
public CreateUserCommandHandler(UserWriteRepository writeRepository) {
this.writeRepository = writeRepository;
}
@Override
public CompletableFuture<Void> handle(CreateUserCommand command) {
return CompletableFuture.supplyAsync(() -> {
// Validate business rules
if (writeRepository.existsByUsername(command.getUsername())) {
throw new RuntimeException("Username already exists: " + command.getUsername());
}
if (writeRepository.existsByEmail(command.getEmail())) {
throw new RuntimeException("Email already exists: " + command.getEmail());
}
// Create aggregate
UserAggregate user = new UserAggregate(
command.getUserId(),
command.getUsername(),
command.getEmail(),
hashPassword(command.getPassword())
);
// Save to write store
writeRepository.save(user);
return null;
});
}
private String hashPassword(String password) {
// In real application, use proper password hashing
return Integer.toHexString(password.hashCode());
}
}
class UpdateUserEmailCommandHandler implements CommandHandler<UpdateUserEmailCommand> {
private final UserWriteRepository writeRepository;
public UpdateUserEmailCommandHandler(UserWriteRepository writeRepository) {
this.writeRepository = writeRepository;
}
@Override
public CompletableFuture<Void> handle(UpdateUserEmailCommand command) {
return CompletableFuture.supplyAsync(() -> {
UserAggregate user = writeRepository.findById(command.getUserId())
.orElseThrow(() -> new RuntimeException("User not found: " + command.getUserId()));
if (writeRepository.existsByEmail(command.getNewEmail())) {
throw new RuntimeException("Email already exists: " + command.getNewEmail());
}
user.updateEmail(command.getNewEmail());
writeRepository.save(user);
return null;
});
}
}
class DeactivateUserCommandHandler implements CommandHandler<DeactivateUserCommand> {
private final UserWriteRepository writeRepository;
public DeactivateUserCommandHandler(UserWriteRepository writeRepository) {
this.writeRepository = writeRepository;
}
@Override
public CompletableFuture<Void> handle(DeactivateUserCommand command) {
return CompletableFuture.supplyAsync(() -> {
UserAggregate user = writeRepository.findById(command.getUserId())
.orElseThrow(() -> new RuntimeException("User not found: " + command.getUserId()));
user.deactivate();
writeRepository.save(user);
return null;
});
}
}
// Query Handlers
class GetUserByIdQueryHandler implements QueryHandler<GetUserByIdQuery, UserView> {
private final UserReadRepository readRepository;
public GetUserByIdQueryHandler(UserReadRepository readRepository) {
this.readRepository = readRepository;
}
@Override
public CompletableFuture<UserView> handle(GetUserByIdQuery query) {
return CompletableFuture.supplyAsync(() -> 
readRepository.findById(query.getUserId())
.orElseThrow(() -> new RuntimeException("User not found: " + query.getUserId()))
);
}
}
class GetUserByEmailQueryHandler implements QueryHandler<GetUserByEmailQuery, UserView> {
private final UserReadRepository readRepository;
public GetUserByEmailQueryHandler(UserReadRepository readRepository) {
this.readRepository = readRepository;
}
@Override
public CompletableFuture<UserView> handle(GetUserByEmailQuery query) {
return CompletableFuture.supplyAsync(() -> 
readRepository.findByEmail(query.getEmail())
.orElseThrow(() -> new RuntimeException("User not found with email: " + query.getEmail()))
);
}
}
class GetAllUsersQueryHandler implements QueryHandler<GetAllUsersQuery, List<UserView>> {
private final UserReadRepository readRepository;
public GetAllUsersQueryHandler(UserReadRepository readRepository) {
this.readRepository = readRepository;
}
@Override
public CompletableFuture<List<UserView>> handle(GetAllUsersQuery query) {
return CompletableFuture.supplyAsync(() -> {
if (query.isIncludeInactive()) {
return readRepository.findAll();
} else {
return readRepository.findActiveUsers();
}
});
}
}
// Repositories
class UserWriteRepository {
private Map<String, UserAggregate> store = new ConcurrentHashMap<>();
public void save(UserAggregate user) {
store.put(user.getUserId(), user);
}
public Optional<UserAggregate> findById(String userId) {
return Optional.ofNullable(store.get(userId));
}
public boolean existsByUsername(String username) {
return store.values().stream()
.anyMatch(user -> user.getUsername().equals(username) && user.isActive());
}
public boolean existsByEmail(String email) {
return store.values().stream()
.anyMatch(user -> user.getEmail().equals(email) && user.isActive());
}
}
class UserReadRepository {
private Map<String, UserView> store = new ConcurrentHashMap<>();
private Map<String, String> emailToUserId = new ConcurrentHashMap<>();
public void save(UserView userView) {
store.put(userView.getUserId(), userView);
emailToUserId.put(userView.getEmail(), userView.getUserId());
}
public Optional<UserView> findById(String userId) {
return Optional.ofNullable(store.get(userId));
}
public Optional<UserView> findByEmail(String email) {
return Optional.ofNullable(emailToUserId.get(email))
.flatMap(this::findById);
}
public List<UserView> findAll() {
return new ArrayList<>(store.values());
}
public List<UserView> findActiveUsers() {
return store.values().stream()
.filter(UserView::isActive)
.collect(Collectors.toList());
}
public void updateEmail(String userId, String newEmail) {
UserView user = store.get(userId);
if (user != null) {
// Remove old email mapping
emailToUserId.remove(user.getEmail());
// Create updated user view
UserView updatedUser = new UserView(
user.getUserId(),
user.getUsername(),
newEmail,
user.isActive(),
user.getCreatedAt(),
LocalDateTime.now()
);
// Update stores
store.put(userId, updatedUser);
emailToUserId.put(newEmail, userId);
}
}
public void deactivateUser(String userId) {
UserView user = store.get(userId);
if (user != null) {
UserView updatedUser = new UserView(
user.getUserId(),
user.getUsername(),
user.getEmail(),
false,
user.getCreatedAt(),
LocalDateTime.now()
);
store.put(userId, updatedUser);
}
}
}
// Projector (Synchronizes Write and Read models)
class UserProjector {
private final UserReadRepository readRepository;
public UserProjector(UserReadRepository readRepository) {
this.readRepository = readRepository;
}
public void projectUserCreated(UserAggregate user) {
readRepository.save(user.toView());
}
public void projectUserEmailUpdated(String userId, String newEmail) {
readRepository.updateEmail(userId, newEmail);
}
public void projectUserDeactivated(String userId) {
readRepository.deactivateUser(userId);
}
}
// Demo
public class BasicCQRSDemo {
public static void main(String[] args) throws Exception {
// Setup infrastructure
UserWriteRepository writeRepository = new UserWriteRepository();
UserReadRepository readRepository = new UserReadRepository();
UserProjector projector = new UserProjector(readRepository);
CommandBus commandBus = new CommandBus();
QueryBus queryBus = new QueryBus();
// Register command handlers
commandBus.registerHandler(CreateUserCommand.class, new CreateUserCommandHandler(writeRepository));
commandBus.registerHandler(UpdateUserEmailCommand.class, new UpdateUserEmailCommandHandler(writeRepository));
commandBus.registerHandler(DeactivateUserCommand.class, new DeactivateUserCommandHandler(writeRepository));
// Register query handlers
queryBus.registerHandler(GetUserByIdQuery.class, new GetUserByIdQueryHandler(readRepository));
queryBus.registerHandler(GetUserByEmailQuery.class, new GetUserByEmailQueryHandler(readRepository));
queryBus.registerHandler(GetAllUsersQuery.class, new GetAllUsersQueryHandler(readRepository));
// Simple projection (in real app, use events)
setupProjection(writeRepository, projector);
System.out.println("=== Basic CQRS Demo ===\n");
// Create users
String userId1 = "user-1";
String userId2 = "user-2";
commandBus.dispatch(new CreateUserCommand(userId1, "john_doe", "[email protected]", "password123"))
.thenRun(() -> System.out.println("User 1 created"))
.get();
commandBus.dispatch(new CreateUserCommand(userId2, "jane_smith", "[email protected]", "password456"))
.thenRun(() -> System.out.println("User 2 created"))
.get();
// Query users
queryBus.dispatch(new GetUserByIdQuery(userId1))
.thenAccept(user -> System.out.println("Found by ID: " + user))
.get();
queryBus.dispatch(new GetUserByEmailQuery("[email protected]"))
.thenAccept(user -> System.out.println("Found by email: " + user))
.get();
// Update user email
commandBus.dispatch(new UpdateUserEmailCommand(userId1, "[email protected]"))
.thenRun(() -> System.out.println("User email updated"))
.get();
// Query all active users
queryBus.dispatch(new GetAllUsersQuery(false))
.thenAccept(users -> {
System.out.println("\nActive users:");
users.forEach(System.out::println);
})
.get();
// Deactivate user
commandBus.dispatch(new DeactivateUserCommand(userId2))
.thenRun(() -> System.out.println("User 2 deactivated"))
.get();
// Query all users including inactive
queryBus.dispatch(new GetAllUsersQuery(true))
.thenAccept(users -> {
System.out.println("\nAll users (including inactive):");
users.forEach(System.out::println);
})
.get();
}
private static void setupProjection(UserWriteRepository writeRepository, UserProjector projector) {
// In real application, this would be handled by event listeners
// For demo, we'll manually sync after commands
}
}

Advanced CQRS with Event Sourcing

Example 3: Event-Sourced CQRS

import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
// Events
interface DomainEvent {
String getAggregateId();
LocalDateTime getOccurredAt();
}
class UserCreatedEvent implements DomainEvent {
private final String userId;
private final String username;
private final String email;
private final LocalDateTime occurredAt;
public UserCreatedEvent(String userId, String username, String email) {
this.userId = userId;
this.username = username;
this.email = email;
this.occurredAt = LocalDateTime.now();
}
@Override public String getAggregateId() { return userId; }
@Override public LocalDateTime getOccurredAt() { return occurredAt; }
public String getUserId() { return userId; }
public String getUsername() { return username; }
public String getEmail() { return email; }
}
class UserEmailUpdatedEvent implements DomainEvent {
private final String userId;
private final String newEmail;
private final LocalDateTime occurredAt;
public UserEmailUpdatedEvent(String userId, String newEmail) {
this.userId = userId;
this.newEmail = newEmail;
this.occurredAt = LocalDateTime.now();
}
@Override public String getAggregateId() { return userId; }
@Override public LocalDateTime getOccurredAt() { return occurredAt; }
public String getUserId() { return userId; }
public String getNewEmail() { return newEmail; }
}
class UserDeactivatedEvent implements DomainEvent {
private final String userId;
private final LocalDateTime occurredAt;
public UserDeactivatedEvent(String userId) {
this.userId = userId;
this.occurredAt = LocalDateTime.now();
}
@Override public String getAggregateId() { return userId; }
@Override public LocalDateTime getOccurredAt() { return occurredAt; }
public String getUserId() { return userId; }
}
// Event Store
class EventStore {
private Map<String, List<DomainEvent>> eventStreams = new ConcurrentHashMap<>();
public void appendEvents(String aggregateId, List<DomainEvent> events) {
eventStreams.computeIfAbsent(aggregateId, k -> new ArrayList<>()).addAll(events);
}
public List<DomainEvent> getEvents(String aggregateId) {
return eventStreams.getOrDefault(aggregateId, new ArrayList<>());
}
public List<DomainEvent> getAllEvents() {
return eventStreams.values().stream()
.flatMap(List::stream)
.sorted(Comparator.comparing(DomainEvent::getOccurredAt))
.collect(Collectors.toList());
}
}
// Event-Sourced Aggregate
class EventSourcedUserAggregate {
private String userId;
private String username;
private String email;
private boolean active;
private int version;
public EventSourcedUserAggregate() {
// For reconstruction
}
// Command methods
public List<DomainEvent> createUser(String userId, String username, String email) {
if (this.userId != null) {
throw new IllegalStateException("User already created");
}
List<DomainEvent> events = new ArrayList<>();
events.add(new UserCreatedEvent(userId, username, email));
return events;
}
public List<DomainEvent> updateEmail(String newEmail) {
if (!this.active) {
throw new IllegalStateException("Cannot update deactivated user");
}
if (this.email.equals(newEmail)) {
return new ArrayList<>();
}
List<DomainEvent> events = new ArrayList<>();
events.add(new UserEmailUpdatedEvent(userId, newEmail));
return events;
}
public List<DomainEvent> deactivate() {
if (!this.active) {
return new ArrayList<>();
}
List<DomainEvent> events = new ArrayList<>();
events.add(new UserDeactivatedEvent(userId));
return events;
}
// Event application methods
public void apply(UserCreatedEvent event) {
this.userId = event.getUserId();
this.username = event.getUsername();
this.email = event.getEmail();
this.active = true;
this.version = 0;
}
public void apply(UserEmailUpdatedEvent event) {
this.email = event.getNewEmail();
this.version++;
}
public void apply(UserDeactivatedEvent event) {
this.active = false;
this.version++;
}
// Reconstruct from events
public static EventSourcedUserAggregate reconstructFromEvents(List<DomainEvent> events) {
EventSourcedUserAggregate aggregate = new EventSourcedUserAggregate();
for (DomainEvent event : events) {
aggregate.applyEvent(event);
}
return aggregate;
}
private void applyEvent(DomainEvent event) {
if (event instanceof UserCreatedEvent) {
apply((UserCreatedEvent) event);
} else if (event instanceof UserEmailUpdatedEvent) {
apply((UserEmailUpdatedEvent) event);
} else if (event instanceof UserDeactivatedEvent) {
apply((UserDeactivatedEvent) event);
}
}
// Getters
public String getUserId() { return userId; }
public String getUsername() { return username; }
public String getEmail() { return email; }
public boolean isActive() { return active; }
public int getVersion() { return version; }
public UserView toView() {
return new UserView(userId, username, email, active, 
LocalDateTime.now(), LocalDateTime.now());
}
}
// Event-Sourced Command Handlers
class EventSourcedCreateUserCommandHandler implements CommandHandler<CreateUserCommand> {
private final EventStore eventStore;
public EventSourcedCreateUserCommandHandler(EventStore eventStore) {
this.eventStore = eventStore;
}
@Override
public CompletableFuture<Void> handle(CreateUserCommand command) {
return CompletableFuture.supplyAsync(() -> {
// Check if user already exists
List<DomainEvent> existingEvents = eventStore.getEvents(command.getUserId());
if (!existingEvents.isEmpty()) {
throw new RuntimeException("User already exists: " + command.getUserId());
}
EventSourcedUserAggregate aggregate = new EventSourcedUserAggregate();
List<DomainEvent> events = aggregate.createUser(
command.getUserId(),
command.getUsername(),
command.getEmail()
);
eventStore.appendEvents(command.getUserId(), events);
return null;
});
}
}
class EventSourcedUpdateUserEmailCommandHandler implements CommandHandler<UpdateUserEmailCommand> {
private final EventStore eventStore;
public EventSourcedUpdateUserEmailCommandHandler(EventStore eventStore) {
this.eventStore = eventStore;
}
@Override
public CompletableFuture<Void> handle(UpdateUserEmailCommand command) {
return CompletableFuture.supplyAsync(() -> {
List<DomainEvent> existingEvents = eventStore.getEvents(command.getUserId());
if (existingEvents.isEmpty()) {
throw new RuntimeException("User not found: " + command.getUserId());
}
EventSourcedUserAggregate aggregate = 
EventSourcedUserAggregate.reconstructFromEvents(existingEvents);
List<DomainEvent> events = aggregate.updateEmail(command.getNewEmail());
if (!events.isEmpty()) {
eventStore.appendEvents(command.getUserId(), events);
}
return null;
});
}
}
// Event Handler for Projections
interface EventHandler<T extends DomainEvent> {
void handle(T event);
}
class UserProjectionEventHandler {
private final UserReadRepository readRepository;
public UserProjectionEventHandler(UserReadRepository readRepository) {
this.readRepository = readRepository;
}
@EventHandler
public void handle(UserCreatedEvent event) {
UserView userView = new UserView(
event.getUserId(),
event.getUsername(),
event.getEmail(),
true,
event.getOccurredAt(),
event.getOccurredAt()
);
readRepository.save(userView);
System.out.println("Projection updated: User created - " + event.getUsername());
}
@EventHandler
public void handle(UserEmailUpdatedEvent event) {
readRepository.updateEmail(event.getUserId(), event.getNewEmail());
System.out.println("Projection updated: Email updated for user - " + event.getUserId());
}
@EventHandler
public void handle(UserDeactivatedEvent event) {
readRepository.deactivateUser(event.getUserId());
System.out.println("Projection updated: User deactivated - " + event.getUserId());
}
}
// Event Bus
class EventBus {
private Map<Class<? extends DomainEvent>, List<EventHandler<?>>> handlers = new HashMap<>();
@SuppressWarnings("unchecked")
public <T extends DomainEvent> void registerHandler(Class<T> eventType, EventHandler<T> handler) {
handlers.computeIfAbsent(eventType, k -> new ArrayList<>()).add(handler);
}
@SuppressWarnings("unchecked")
public void publish(DomainEvent event) {
List<EventHandler<?>> eventHandlers = handlers.get(event.getClass());
if (eventHandlers != null) {
for (EventHandler handler : eventHandlers) {
try {
handler.handle(event);
} catch (Exception e) {
System.err.println("Error handling event: " + e.getMessage());
}
}
}
}
public void publishAll(List<DomainEvent> events) {
events.forEach(this::publish);
}
}
// Event-Sourced CQRS Demo
public class EventSourcedCQRSDemo {
public static void main(String[] args) throws Exception {
// Setup infrastructure
EventStore eventStore = new EventStore();
UserReadRepository readRepository = new UserReadRepository();
CommandBus commandBus = new CommandBus();
QueryBus queryBus = new QueryBus();
EventBus eventBus = new EventBus();
// Register event handlers
UserProjectionEventHandler projectionHandler = new UserProjectionEventHandler(readRepository);
eventBus.registerHandler(UserCreatedEvent.class, projectionHandler::handle);
eventBus.registerHandler(UserEmailUpdatedEvent.class, projectionHandler::handle);
eventBus.registerHandler(UserDeactivatedEvent.class, projectionHandler::handle);
// Register command handlers
commandBus.registerHandler(CreateUserCommand.class, new EventSourcedCreateUserCommandHandler(eventStore));
commandBus.registerHandler(UpdateUserEmailCommand.class, new EventSourcedUpdateUserEmailCommandHandler(eventStore));
// Register query handlers
queryBus.registerHandler(GetUserByIdQuery.class, new GetUserByIdQueryHandler(readRepository));
queryBus.registerHandler(GetAllUsersQuery.class, new GetAllUsersQueryHandler(readRepository));
System.out.println("=== Event-Sourced CQRS Demo ===\n");
// Create users
String userId1 = "event-user-1";
String userId2 = "event-user-2";
commandBus.dispatch(new CreateUserCommand(userId1, "alice", "[email protected]", "pass123"))
.thenRun(() -> {
// Publish events after command processing
List<DomainEvent> events = eventStore.getEvents(userId1);
eventBus.publishAll(events);
})
.get();
commandBus.dispatch(new CreateUserCommand(userId2, "bob", "[email protected]", "pass456"))
.thenRun(() -> {
List<DomainEvent> events = eventStore.getEvents(userId2);
eventBus.publishAll(events);
})
.get();
// Query users
queryBus.dispatch(new GetAllUsersQuery(true))
.thenAccept(users -> {
System.out.println("\nAll users:");
users.forEach(System.out::println);
})
.get();
// Update user email
commandBus.dispatch(new UpdateUserEmailCommand(userId1, "[email protected]"))
.thenRun(() -> {
List<DomainEvent> events = eventStore.getEvents(userId1);
// Only publish the latest events
if (!events.isEmpty()) {
eventBus.publish(events.get(events.size() - 1));
}
})
.get();
// Query again to see updated data
Thread.sleep(100); // Allow projection to update
queryBus.dispatch(new GetAllUsersQuery(true))
.thenAccept(users -> {
System.out.println("\nUsers after email update:");
users.forEach(System.out::println);
})
.get();
// Show event stream
System.out.println("\nEvent Stream for user-1:");
eventStore.getEvents(userId1).forEach(event -> 
System.out.println(" - " + event.getClass().getSimpleName() + " at " + ((DomainEvent) event).getOccurredAt())
);
}
}

CQRS with Different Read Models

Example 4: Multiple Read Models

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
// Specialized Read Models
class UserListView {
private final String userId;
private final String username;
private final String email;
private final boolean active;
public UserListView(String userId, String username, String email, boolean active) {
this.userId = userId;
this.username = username;
this.email = email;
this.active = active;
}
// Getters
public String getUserId() { return userId; }
public String getUsername() { return username; }
public String getEmail() { return email; }
public boolean isActive() { return active; }
@Override
public String toString() {
return String.format("UserListView{username=%s, email=%s, active=%s}", 
username, email, active);
}
}
class UserStatisticsView {
private final int totalUsers;
private final int activeUsers;
private final int inactiveUsers;
private final LocalDateTime lastUpdated;
public UserStatisticsView(int totalUsers, int activeUsers, int inactiveUsers) {
this.totalUsers = totalUsers;
this.activeUsers = activeUsers;
this.inactiveUsers = inactiveUsers;
this.lastUpdated = LocalDateTime.now();
}
// Getters
public int getTotalUsers() { return totalUsers; }
public int getActiveUsers() { return activeUsers; }
public int getInactiveUsers() { return inactiveUsers; }
public LocalDateTime getLastUpdated() { return lastUpdated; }
@Override
public String toString() {
return String.format("UserStats{total=%d, active=%d, inactive=%d}", 
totalUsers, activeUsers, inactiveUsers);
}
}
// Specialized Queries
class GetUserListQuery implements Query {
private final int page;
private final int size;
public GetUserListQuery(int page, int size) {
this.page = page;
this.size = size;
}
public int getPage() { return page; }
public int getSize() { return size; }
}
class GetUserStatisticsQuery implements Query {
}
// Specialized Read Repositories
class UserListReadRepository {
private Map<String, UserListView> store = new ConcurrentHashMap<>();
private List<String> activeUserIds = new ArrayList<>();
public void save(UserListView user) {
store.put(user.getUserId(), user);
if (user.isActive() && !activeUserIds.contains(user.getUserId())) {
activeUserIds.add(user.getUserId());
}
}
public List<UserListView> getUsers(int page, int size) {
int start = page * size;
int end = Math.min(start + size, activeUserIds.size());
if (start >= activeUserIds.size()) {
return new ArrayList<>();
}
return activeUserIds.subList(start, end).stream()
.map(store::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
public void updateEmail(String userId, String newEmail) {
UserListView user = store.get(userId);
if (user != null) {
UserListView updatedUser = new UserListView(
user.getUserId(),
user.getUsername(),
newEmail,
user.isActive()
);
store.put(userId, updatedUser);
}
}
public void deactivateUser(String userId) {
UserListView user = store.get(userId);
if (user != null) {
UserListView updatedUser = new UserListView(
user.getUserId(),
user.getUsername(),
user.getEmail(),
false
);
store.put(userId, updatedUser);
activeUserIds.remove(userId);
}
}
}
class UserStatisticsReadRepository {
private int totalUsers = 0;
private int activeUsers = 0;
private int inactiveUsers = 0;
public void userCreated(boolean active) {
totalUsers++;
if (active) {
activeUsers++;
} else {
inactiveUsers++;
}
}
public void userActivated() {
activeUsers++;
inactiveUsers--;
}
public void userDeactivated() {
activeUsers--;
inactiveUsers++;
}
public UserStatisticsView getStatistics() {
return new UserStatisticsView(totalUsers, activeUsers, inactiveUsers);
}
}
// Specialized Query Handlers
class GetUserListQueryHandler implements QueryHandler<GetUserListQuery, List<UserListView>> {
private final UserListReadRepository listRepository;
public GetUserListQueryHandler(UserListReadRepository listRepository) {
this.listRepository = listRepository;
}
@Override
public CompletableFuture<List<UserListView>> handle(GetUserListQuery query) {
return CompletableFuture.supplyAsync(() -> 
listRepository.getUsers(query.getPage(), query.getSize())
);
}
}
class GetUserStatisticsQueryHandler implements QueryHandler<GetUserStatisticsQuery, UserStatisticsView> {
private final UserStatisticsReadRepository statsRepository;
public GetUserStatisticsQueryHandler(UserStatisticsReadRepository statsRepository) {
this.statsRepository = statsRepository;
}
@Override
public CompletableFuture<UserStatisticsView> handle(GetUserStatisticsQuery query) {
return CompletableFuture.supplyAsync(() -> 
statsRepository.getStatistics()
);
}
}
// Multiple Projection Handlers
class MultipleProjectionHandler {
private final UserListReadRepository listRepository;
private final UserStatisticsReadRepository statsRepository;
private final UserReadRepository detailRepository;
public MultipleProjectionHandler(UserListReadRepository listRepository,
UserStatisticsReadRepository statsRepository,
UserReadRepository detailRepository) {
this.listRepository = listRepository;
this.statsRepository = statsRepository;
this.detailRepository = detailRepository;
}
@EventHandler
public void handle(UserCreatedEvent event) {
// Update detail view
UserView detailView = new UserView(
event.getUserId(),
event.getUsername(),
event.getEmail(),
true,
event.getOccurredAt(),
event.getOccurredAt()
);
detailRepository.save(detailView);
// Update list view
UserListView listView = new UserListView(
event.getUserId(),
event.getUsername(),
event.getEmail(),
true
);
listRepository.save(listView);
// Update statistics
statsRepository.userCreated(true);
System.out.println("Multiple projections updated for user: " + event.getUsername());
}
@EventHandler
public void handle(UserDeactivatedEvent event) {
detailRepository.deactivateUser(event.getUserId());
listRepository.deactivateUser(event.getUserId());
statsRepository.userDeactivated();
System.out.println("Multiple projections updated for deactivated user: " + event.getUserId());
}
}
// Demo with Multiple Read Models
public class MultipleReadModelsDemo {
public static void main(String[] args) throws Exception {
// Setup infrastructure
EventStore eventStore = new EventStore();
UserReadRepository detailRepository = new UserReadRepository();
UserListReadRepository listRepository = new UserListReadRepository();
UserStatisticsReadRepository statsRepository = new UserStatisticsReadRepository();
CommandBus commandBus = new CommandBus();
QueryBus queryBus = new QueryBus();
EventBus eventBus = new EventBus();
// Register multiple projection handlers
MultipleProjectionHandler multiProjection = new MultipleProjectionHandler(
listRepository, statsRepository, detailRepository
);
eventBus.registerHandler(UserCreatedEvent.class, multiProjection::handle);
eventBus.registerHandler(UserDeactivatedEvent.class, multiProjection::handle);
// Register command handlers
commandBus.registerHandler(CreateUserCommand.class, new EventSourcedCreateUserCommandHandler(eventStore));
// Register multiple query handlers
queryBus.registerHandler(GetUserByIdQuery.class, new GetUserByIdQueryHandler(detailRepository));
queryBus.registerHandler(GetUserListQuery.class, new GetUserListQueryHandler(listRepository));
queryBus.registerHandler(GetUserStatisticsQuery.class, new GetUserStatisticsQueryHandler(statsRepository));
System.out.println("=== Multiple Read Models Demo ===\n");
// Create multiple users
for (int i = 1; i <= 5; i++) {
String userId = "multi-user-" + i;
commandBus.dispatch(new CreateUserCommand(userId, "user" + i, "user" + i + "@example.com", "pass"))
.thenRun(() -> {
List<DomainEvent> events = eventStore.getEvents(userId);
eventBus.publishAll(events);
})
.get();
}
// Query different read models
System.out.println("\n--- Detail View (Get by ID) ---");
queryBus.dispatch(new GetUserByIdQuery("multi-user-1"))
.thenAccept(user -> System.out.println("Detail: " + user))
.get();
System.out.println("\n--- List View (Paged) ---");
queryBus.dispatch(new GetUserListQuery(0, 3))
.thenAccept(users -> {
System.out.println("Page 1:");
users.forEach(System.out::println);
})
.get();
queryBus.dispatch(new GetUserListQuery(1, 3))
.thenAccept(users -> {
System.out.println("Page 2:");
users.forEach(System.out::println);
})
.get();
System.out.println("\n--- Statistics View ---");
queryBus.dispatch(new GetUserStatisticsQuery())
.thenAccept(stats -> System.out.println("Statistics: " + stats))
.get();
// Demonstrate that each read model is optimized for its purpose
System.out.println("\nEach read model serves a different purpose:");
System.out.println("- Detail view: Full user information for detail pages");
System.out.println("- List view: Lightweight data for listing pages");
System.out.println("- Statistics: Aggregated data for dashboards");
}
}

Best Practices and Considerations

When to Use CQRS:

  • Complex domains with different read and write patterns
  • High-performance applications requiring optimized queries
  • Scalability requirements for reads and writes separately
  • Team separation between read and write responsibilities

Benefits:

  • Optimized queries: Read models can be denormalized for performance
  • Scalability: Read and write sides can scale independently
  • Flexibility: Different read models for different use cases
  • Maintainability: Separation of concerns

Challenges:

  • Complexity: More moving parts to manage
  • Eventual consistency: Read models may be slightly stale
  • Debugging: Harder to trace through the system
  • Learning curve: Team needs to understand the pattern

Implementation Tips:

  1. Start simple without event sourcing
  2. Use eventual consistency where acceptable
  3. Implement proper error handling and retry mechanisms
  4. Monitor consistency between read and write models
  5. Consider using frameworks like Axon Framework for complex implementations

CQRS is particularly useful in microservices architectures, high-traffic web applications, and systems where read and write workloads have significantly different characteristics.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper