Event Store with JDBC in Java: Complete Implementation

Introduction to Event Sourcing with JDBC

Event Sourcing is a pattern that stores all changes to application state as a sequence of events. Instead of storing the current state, we store the history of all state changes. This provides an audit trail, enables temporal queries, and supports complex business logic.

Table of Contents

  1. Core Concepts and Domain Model
  2. Database Schema Design
  3. Event Store Implementation
  4. Event Serialization
  5. Aggregate Root Pattern
  6. Projection System
  7. Query Capabilities
  8. Performance Optimization
  9. Testing Strategy

1. Core Concepts and Domain Model

Base Event Interface and Classes

package com.example.eventstore.events;
import java.time.Instant;
import java.util.UUID;
/**
* Marker interface for all events in the system
*/
public interface DomainEvent {
UUID getEventId();
String getAggregateId();
String getAggregateType();
Instant getTimestamp();
int getVersion();
String getEventType();
}
/**
* Base implementation for all domain events
*/
public abstract class BaseEvent implements DomainEvent {
private final UUID eventId;
private final String aggregateId;
private final String aggregateType;
private final Instant timestamp;
private final int version;
private final String eventType;
protected BaseEvent(String aggregateId, String aggregateType, int version, String eventType) {
this.eventId = UUID.randomUUID();
this.aggregateId = aggregateId;
this.aggregateType = aggregateType;
this.timestamp = Instant.now();
this.version = version;
this.eventType = eventType;
}
// Getters
@Override public UUID getEventId() { return eventId; }
@Override public String getAggregateId() { return aggregateId; }
@Override public String getAggregateType() { return aggregateType; }
@Override public Instant getTimestamp() { return timestamp; }
@Override public int getVersion() { return version; }
@Override public String getEventType() { return eventType; }
}

Sample Domain Events

package com.example.eventstore.events;
import java.math.BigDecimal;
// User domain events
public class UserRegisteredEvent extends BaseEvent {
private final String username;
private final String email;
private final String fullName;
public UserRegisteredEvent(String aggregateId, int version, 
String username, String email, String fullName) {
super(aggregateId, "USER", version, "USER_REGISTERED");
this.username = username;
this.email = email;
this.fullName = fullName;
}
// Getters
public String getUsername() { return username; }
public String getEmail() { return email; }
public String getFullName() { return fullName; }
}
public class UserEmailChangedEvent extends BaseEvent {
private final String oldEmail;
private final String newEmail;
public UserEmailChangedEvent(String aggregateId, int version, 
String oldEmail, String newEmail) {
super(aggregateId, "USER", version, "USER_EMAIL_CHANGED");
this.oldEmail = oldEmail;
this.newEmail = newEmail;
}
public String getOldEmail() { return oldEmail; }
public String getNewEmail() { return newEmail; }
}
public class UserDeletedEvent extends BaseEvent {
private final String reason;
public UserDeletedEvent(String aggregateId, int version, String reason) {
super(aggregateId, "USER", version, "USER_DELETED");
this.reason = reason;
}
public String getReason() { return reason; }
}
// Order domain events
public class OrderCreatedEvent extends BaseEvent {
private final String customerId;
private final BigDecimal totalAmount;
private final String currency;
public OrderCreatedEvent(String aggregateId, int version, 
String customerId, BigDecimal totalAmount, String currency) {
super(aggregateId, "ORDER", version, "ORDER_CREATED");
this.customerId = customerId;
this.totalAmount = totalAmount;
this.currency = currency;
}
public String getCustomerId() { return customerId; }
public BigDecimal getTotalAmount() { return totalAmount; }
public String getCurrency() { return currency; }
}
public class OrderItemAddedEvent extends BaseEvent {
private final String productId;
private final String productName;
private final int quantity;
private final BigDecimal unitPrice;
public OrderItemAddedEvent(String aggregateId, int version, 
String productId, String productName, 
int quantity, BigDecimal unitPrice) {
super(aggregateId, "ORDER", version, "ORDER_ITEM_ADDED");
this.productId = productId;
this.productName = productName;
this.quantity = quantity;
this.unitPrice = unitPrice;
}
public String getProductId() { return productId; }
public String getProductName() { return productName; }
public int getQuantity() { return quantity; }
public BigDecimal getUnitPrice() { return unitPrice; }
}
public class OrderCancelledEvent extends BaseEvent {
private final String reason;
public OrderCancelledEvent(String aggregateId, int version, String reason) {
super(aggregateId, "ORDER", version, "ORDER_CANCELLED");
this.reason = reason;
}
public String getReason() { return reason; }
}

2. Database Schema Design

SQL Schema for Event Store

-- Events table
CREATE TABLE events (
event_id VARCHAR(36) PRIMARY KEY,
aggregate_id VARCHAR(255) NOT NULL,
aggregate_type VARCHAR(50) NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_data JSON NOT NULL,
event_version INTEGER NOT NULL,
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
metadata JSON,
correlation_id VARCHAR(36),
causation_id VARCHAR(36)
);
-- Indexes for performance
CREATE INDEX idx_events_aggregate ON events(aggregate_type, aggregate_id);
CREATE INDEX idx_events_timestamp ON events(timestamp);
CREATE INDEX idx_events_type ON events(event_type);
CREATE INDEX idx_events_correlation ON events(correlation_id);
-- Snapshots table for performance optimization
CREATE TABLE snapshots (
aggregate_id VARCHAR(255) NOT NULL,
aggregate_type VARCHAR(50) NOT NULL,
snapshot_data JSON NOT NULL,
snapshot_version INTEGER NOT NULL,
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (aggregate_id, aggregate_type)
);
-- Projections table for read models
CREATE TABLE projections (
projection_id VARCHAR(255) PRIMARY KEY,
projection_type VARCHAR(100) NOT NULL,
projection_data JSON NOT NULL,
last_event_id VARCHAR(36),
last_updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
version INTEGER NOT NULL DEFAULT 0
);
-- Schema for PostgreSQL
CREATE TABLE IF NOT EXISTS event_store_metadata (
key VARCHAR(100) PRIMARY KEY,
value VARCHAR(500) NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO event_store_metadata (key, value) VALUES ('schema_version', '1.0');

3. Event Store Implementation

Event Store Interface

package com.example.eventstore.store;
import com.example.eventstore.events.DomainEvent;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
public interface EventStore {
// Basic operations
CompletableFuture<Void> appendEvents(String aggregateId, List<DomainEvent> events, int expectedVersion);
CompletableFuture<List<DomainEvent>> getEventsForAggregate(String aggregateId);
CompletableFuture<List<DomainEvent>> getEventsForAggregate(String aggregateId, int fromVersion);
// Query operations
CompletableFuture<List<DomainEvent>> getEventsByType(String eventType);
CompletableFuture<List<DomainEvent>> getEventsSince(Instant since);
CompletableFuture<List<DomainEvent>> getEventsByAggregateType(String aggregateType);
// Subscription operations
void subscribeToAll(EventStoreSubscription subscription);
void subscribeToAggregate(String aggregateId, EventStoreSubscription subscription);
void subscribeToEventType(String eventType, EventStoreSubscription subscription);
// Snapshot operations
CompletableFuture<Void> saveSnapshot(String aggregateId, Object snapshot, int version);
CompletableFuture<Optional<Object>> getLatestSnapshot(String aggregateId);
}
// Subscription interface
public interface EventStoreSubscription {
void onEvent(DomainEvent event);
void onError(Throwable error);
}

JDBC Event Store Implementation

package com.example.eventstore.store.jdbc;
import com.example.eventstore.events.DomainEvent;
import com.example.eventstore.serialization.EventSerializer;
import com.example.eventstore.store.EventStore;
import com.example.eventstore.store.EventStoreSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.sql.*;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class JdbcEventStore implements EventStore {
private static final Logger logger = LoggerFactory.getLogger(JdbcEventStore.class);
private final DataSource dataSource;
private final EventSerializer eventSerializer;
private final ExecutorService executorService;
private final List<EventStoreSubscription> globalSubscriptions;
private final Map<String, List<EventStoreSubscription>> aggregateSubscriptions;
private final Map<String, List<EventStoreSubscription>> eventTypeSubscriptions;
public JdbcEventStore(DataSource dataSource, EventSerializer eventSerializer) {
this.dataSource = dataSource;
this.eventSerializer = eventSerializer;
this.executorService = Executors.newCachedThreadPool();
this.globalSubscriptions = new CopyOnWriteArrayList<>();
this.aggregateSubscriptions = new HashMap<>();
this.eventTypeSubscriptions = new HashMap<>();
initializeDatabase();
}
private void initializeDatabase() {
try (Connection conn = dataSource.getConnection();
Statement stmt = conn.createStatement()) {
// Check if tables exist and create if necessary
// In production, you would use proper database migration tools
logger.info("Initializing event store database schema...");
} catch (SQLException e) {
throw new RuntimeException("Failed to initialize event store database", e);
}
}
@Override
public CompletableFuture<Void> appendEvents(String aggregateId, List<DomainEvent> events, int expectedVersion) {
return CompletableFuture.runAsync(() -> {
if (events.isEmpty()) {
return;
}
Connection conn = null;
try {
conn = dataSource.getConnection();
conn.setAutoCommit(false);
// Verify expected version
int currentVersion = getCurrentVersion(conn, aggregateId);
if (expectedVersion != -1 && currentVersion != expectedVersion) {
throw new ConcurrencyException(
String.format("Expected version %d but found %d for aggregate %s", 
expectedVersion, currentVersion, aggregateId));
}
// Insert events
for (DomainEvent event : events) {
insertEvent(conn, event);
}
conn.commit();
// Notify subscribers
notifySubscribers(events);
} catch (SQLException e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException rollbackEx) {
logger.error("Failed to rollback transaction", rollbackEx);
}
}
throw new EventStoreException("Failed to append events", e);
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
logger.warn("Failed to close connection", e);
}
}
}
}, executorService);
}
@Override
public CompletableFuture<List<DomainEvent>> getEventsForAggregate(String aggregateId) {
return CompletableFuture.supplyAsync(() -> {
String sql = "SELECT event_data, event_version, event_type, timestamp " +
"FROM events WHERE aggregate_id = ? ORDER BY event_version ASC";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, aggregateId);
ResultSet rs = stmt.executeQuery();
List<DomainEvent> events = new ArrayList<>();
while (rs.next()) {
String eventData = rs.getString("event_data");
String eventType = rs.getString("event_type");
int version = rs.getInt("event_version");
DomainEvent event = eventSerializer.deserialize(eventData, eventType, aggregateId, version);
events.add(event);
}
return events;
} catch (SQLException e) {
throw new EventStoreException("Failed to get events for aggregate: " + aggregateId, e);
}
}, executorService);
}
@Override
public CompletableFuture<List<DomainEvent>> getEventsForAggregate(String aggregateId, int fromVersion) {
return CompletableFuture.supplyAsync(() -> {
String sql = "SELECT event_data, event_version, event_type, timestamp " +
"FROM events WHERE aggregate_id = ? AND event_version >= ? " +
"ORDER BY event_version ASC";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, aggregateId);
stmt.setInt(2, fromVersion);
ResultSet rs = stmt.executeQuery();
List<DomainEvent> events = new ArrayList<>();
while (rs.next()) {
String eventData = rs.getString("event_data");
String eventType = rs.getString("event_type");
int version = rs.getInt("event_version");
DomainEvent event = eventSerializer.deserialize(eventData, eventType, aggregateId, version);
events.add(event);
}
return events;
} catch (SQLException e) {
throw new EventStoreException("Failed to get events for aggregate: " + aggregateId, e);
}
}, executorService);
}
@Override
public CompletableFuture<List<DomainEvent>> getEventsByType(String eventType) {
return CompletableFuture.supplyAsync(() -> {
String sql = "SELECT event_data, event_version, event_type, aggregate_id, timestamp " +
"FROM events WHERE event_type = ? ORDER BY timestamp ASC";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, eventType);
ResultSet rs = stmt.executeQuery();
List<DomainEvent> events = new ArrayList<>();
while (rs.next()) {
String eventData = rs.getString("event_data");
String aggregateId = rs.getString("aggregate_id");
int version = rs.getInt("event_version");
String type = rs.getString("event_type");
DomainEvent event = eventSerializer.deserialize(eventData, type, aggregateId, version);
events.add(event);
}
return events;
} catch (SQLException e) {
throw new EventStoreException("Failed to get events by type: " + eventType, e);
}
}, executorService);
}
@Override
public CompletableFuture<List<DomainEvent>> getEventsSince(Instant since) {
return CompletableFuture.supplyAsync(() -> {
String sql = "SELECT event_data, event_version, event_type, aggregate_id, timestamp " +
"FROM events WHERE timestamp >= ? ORDER BY timestamp ASC";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setTimestamp(1, Timestamp.from(since));
ResultSet rs = stmt.executeQuery();
List<DomainEvent> events = new ArrayList<>();
while (rs.next()) {
String eventData = rs.getString("event_data");
String aggregateId = rs.getString("aggregate_id");
int version = rs.getInt("event_version");
String type = rs.getString("event_type");
DomainEvent event = eventSerializer.deserialize(eventData, type, aggregateId, version);
events.add(event);
}
return events;
} catch (SQLException e) {
throw new EventStoreException("Failed to get events since: " + since, e);
}
}, executorService);
}
@Override
public CompletableFuture<List<DomainEvent>> getEventsByAggregateType(String aggregateType) {
return CompletableFuture.supplyAsync(() -> {
String sql = "SELECT event_data, event_version, event_type, aggregate_id, timestamp " +
"FROM events WHERE aggregate_type = ? ORDER BY timestamp ASC";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, aggregateType);
ResultSet rs = stmt.executeQuery();
List<DomainEvent> events = new ArrayList<>();
while (rs.next()) {
String eventData = rs.getString("event_data");
String aggregateId = rs.getString("aggregate_id");
int version = rs.getInt("event_version");
String type = rs.getString("event_type");
DomainEvent event = eventSerializer.deserialize(eventData, type, aggregateId, version);
events.add(event);
}
return events;
} catch (SQLException e) {
throw new EventStoreException("Failed to get events for aggregate type: " + aggregateType, e);
}
}, executorService);
}
// Subscription methods
@Override
public void subscribeToAll(EventStoreSubscription subscription) {
globalSubscriptions.add(subscription);
}
@Override
public void subscribeToAggregate(String aggregateId, EventStoreSubscription subscription) {
aggregateSubscriptions
.computeIfAbsent(aggregateId, k -> new CopyOnWriteArrayList<>())
.add(subscription);
}
@Override
public void subscribeToEventType(String eventType, EventStoreSubscription subscription) {
eventTypeSubscriptions
.computeIfAbsent(eventType, k -> new CopyOnWriteArrayList<>())
.add(subscription);
}
// Snapshot methods
@Override
public CompletableFuture<Void> saveSnapshot(String aggregateId, Object snapshot, int version) {
return CompletableFuture.runAsync(() -> {
String sql = "INSERT INTO snapshots (aggregate_id, aggregate_type, snapshot_data, snapshot_version, timestamp) " +
"VALUES (?, ?, ?, ?, ?) " +
"ON CONFLICT (aggregate_id, aggregate_type) DO UPDATE SET " +
"snapshot_data = EXCLUDED.snapshot_data, " +
"snapshot_version = EXCLUDED.snapshot_version, " +
"timestamp = EXCLUDED.timestamp";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
String snapshotData = eventSerializer.serializeSnapshot(snapshot);
String aggregateType = extractAggregateType(aggregateId); // Implementation detail
stmt.setString(1, aggregateId);
stmt.setString(2, aggregateType);
stmt.setString(3, snapshotData);
stmt.setInt(4, version);
stmt.setTimestamp(5, Timestamp.from(Instant.now()));
stmt.executeUpdate();
} catch (SQLException e) {
throw new EventStoreException("Failed to save snapshot for aggregate: " + aggregateId, e);
}
}, executorService);
}
@Override
public CompletableFuture<Optional<Object>> getLatestSnapshot(String aggregateId) {
return CompletableFuture.supplyAsync(() -> {
String sql = "SELECT snapshot_data, snapshot_version FROM snapshots " +
"WHERE aggregate_id = ? ORDER BY snapshot_version DESC LIMIT 1";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, aggregateId);
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
String snapshotData = rs.getString("snapshot_data");
int version = rs.getInt("snapshot_version");
return Optional.of(eventSerializer.deserializeSnapshot(snapshotData));
}
return Optional.empty();
} catch (SQLException e) {
throw new EventStoreException("Failed to get snapshot for aggregate: " + aggregateId, e);
}
}, executorService);
}
// Private helper methods
private int getCurrentVersion(Connection conn, String aggregateId) throws SQLException {
String sql = "SELECT MAX(event_version) as max_version FROM events WHERE aggregate_id = ?";
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, aggregateId);
ResultSet rs = stmt.executeQuery();
return rs.next() ? rs.getInt("max_version") : 0;
}
}
private void insertEvent(Connection conn, DomainEvent event) throws SQLException {
String sql = "INSERT INTO events (event_id, aggregate_id, aggregate_type, event_type, " +
"event_data, event_version, timestamp, metadata) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
String eventData = eventSerializer.serialize(event);
stmt.setString(1, event.getEventId().toString());
stmt.setString(2, event.getAggregateId());
stmt.setString(3, event.getAggregateType());
stmt.setString(4, event.getEventType());
stmt.setString(5, eventData);
stmt.setInt(6, event.getVersion());
stmt.setTimestamp(7, Timestamp.from(event.getTimestamp()));
stmt.setString(8, "{}"); // Default metadata
stmt.executeUpdate();
}
}
private void notifySubscribers(List<DomainEvent> events) {
executorService.submit(() -> {
for (DomainEvent event : events) {
// Notify global subscribers
for (EventStoreSubscription subscription : globalSubscriptions) {
try {
subscription.onEvent(event);
} catch (Exception e) {
logger.error("Error in global subscriber", e);
subscription.onError(e);
}
}
// Notify aggregate-specific subscribers
List<EventStoreSubscription> aggregateSubs = 
aggregateSubscriptions.get(event.getAggregateId());
if (aggregateSubs != null) {
for (EventStoreSubscription subscription : aggregateSubs) {
try {
subscription.onEvent(event);
} catch (Exception e) {
logger.error("Error in aggregate subscriber", e);
subscription.onError(e);
}
}
}
// Notify event-type subscribers
List<EventStoreSubscription> eventTypeSubs = 
eventTypeSubscriptions.get(event.getEventType());
if (eventTypeSubs != null) {
for (EventStoreSubscription subscription : eventTypeSubs) {
try {
subscription.onEvent(event);
} catch (Exception e) {
logger.error("Error in event type subscriber", e);
subscription.onError(e);
}
}
}
}
});
}
private String extractAggregateType(String aggregateId) {
// Implementation depends on your aggregate ID format
// For example: "USER-123" -> "USER"
if (aggregateId.contains("-")) {
return aggregateId.split("-")[0];
}
return "UNKNOWN";
}
// Custom exceptions
public static class EventStoreException extends RuntimeException {
public EventStoreException(String message) { super(message); }
public EventStoreException(String message, Throwable cause) { super(message, cause); }
}
public static class ConcurrencyException extends RuntimeException {
public ConcurrencyException(String message) { super(message); }
}
}

4. Event Serialization

Event Serializer Interface and Implementation

package com.example.eventstore.serialization;
import com.example.eventstore.events.DomainEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public interface EventSerializer {
String serialize(DomainEvent event);
DomainEvent deserialize(String data, String eventType, String aggregateId, int version);
String serializeSnapshot(Object snapshot);
Object deserializeSnapshot(String snapshotData);
}
public class JacksonEventSerializer implements EventSerializer {
private static final Logger logger = LoggerFactory.getLogger(JacksonEventSerializer.class);
private final ObjectMapper objectMapper;
private final Map<String, Class<? extends DomainEvent>> eventTypeRegistry;
public JacksonEventSerializer() {
this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new JavaTimeModule());
this.eventTypeRegistry = new ConcurrentHashMap<>();
registerDefaultEventTypes();
}
private void registerDefaultEventTypes() {
// Register your event types here
// eventTypeRegistry.put("USER_REGISTERED", UserRegisteredEvent.class);
// eventTypeRegistry.put("USER_EMAIL_CHANGED", UserEmailChangedEvent.class);
// etc.
}
public void registerEventType(String eventType, Class<? extends DomainEvent> eventClass) {
eventTypeRegistry.put(eventType, eventClass);
}
@Override
public String serialize(DomainEvent event) {
try {
return objectMapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
throw new SerializationException("Failed to serialize event: " + event.getEventType(), e);
}
}
@Override
public DomainEvent deserialize(String data, String eventType, String aggregateId, int version) {
try {
Class<? extends DomainEvent> eventClass = eventTypeRegistry.get(eventType);
if (eventClass == null) {
throw new SerializationException("Unknown event type: " + eventType);
}
DomainEvent event = objectMapper.readValue(data, eventClass);
// Note: In a real implementation, you might need to use reflection
// to set the aggregateId and version if they're not set by Jackson
return event;
} catch (JsonProcessingException e) {
throw new SerializationException("Failed to deserialize event: " + eventType, e);
}
}
@Override
public String serializeSnapshot(Object snapshot) {
try {
return objectMapper.writeValueAsString(snapshot);
} catch (JsonProcessingException e) {
throw new SerializationException("Failed to serialize snapshot", e);
}
}
@Override
public Object deserializeSnapshot(String snapshotData) {
try {
// In a real implementation, you'd need to know the snapshot type
return objectMapper.readValue(snapshotData, Object.class);
} catch (JsonProcessingException e) {
throw new SerializationException("Failed to deserialize snapshot", e);
}
}
public static class SerializationException extends RuntimeException {
public SerializationException(String message) { super(message); }
public SerializationException(String message, Throwable cause) { super(message, cause); }
}
}

5. Aggregate Root Pattern

Aggregate Root Base Class

package com.example.eventstore.aggregates;
import com.example.eventstore.events.DomainEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
public abstract class AggregateRoot {
private final String aggregateId;
private int version;
private final List<DomainEvent> changes;
protected AggregateRoot(String aggregateId) {
this.aggregateId = aggregateId;
this.version = 0;
this.changes = new ArrayList<>();
}
protected AggregateRoot(String aggregateId, List<DomainEvent> events) {
this(aggregateId);
applyEvents(events, false);
}
public String getAggregateId() {
return aggregateId;
}
public int getVersion() {
return version;
}
public List<DomainEvent> getUncommittedChanges() {
return new ArrayList<>(changes);
}
public void markChangesAsCommitted() {
changes.clear();
}
protected void applyChange(DomainEvent event) {
applyChange(event, true);
}
private void applyChange(DomainEvent event, boolean isNew) {
// Apply the event to the aggregate state
apply(event);
// Increment version
version++;
// If this is a new event, add to changes list
if (isNew) {
changes.add(event);
}
}
private void applyEvents(List<DomainEvent> events, boolean isNew) {
for (DomainEvent event : events) {
applyChange(event, isNew);
}
}
// Template method - subclasses must implement this to handle specific events
protected abstract void apply(DomainEvent event);
// Rehydrate aggregate from event stream
public void loadFromHistory(List<DomainEvent> history) {
for (DomainEvent event : history) {
applyChange(event, false);
}
}
}

Concrete Aggregate Implementation

package com.example.eventstore.aggregates;
import com.example.eventstore.events.*;
import java.util.ArrayList;
import java.util.List;
public class UserAggregate extends AggregateRoot {
private String username;
private String email;
private String fullName;
private boolean isActive;
private List<String> previousEmails;
public UserAggregate(String aggregateId) {
super(aggregateId);
this.previousEmails = new ArrayList<>();
this.isActive = true;
}
public UserAggregate(String aggregateId, List<DomainEvent> events) {
super(aggregateId, events);
}
// Command methods
public void register(String username, String email, String fullName) {
if (this.username != null) {
throw new IllegalStateException("User already registered");
}
applyChange(new UserRegisteredEvent(getAggregateId(), getVersion() + 1, 
username, email, fullName));
}
public void changeEmail(String newEmail) {
if (!isActive) {
throw new IllegalStateException("Cannot change email for inactive user");
}
String oldEmail = this.email;
applyChange(new UserEmailChangedEvent(getAggregateId(), getVersion() + 1, 
oldEmail, newEmail));
}
public void delete(String reason) {
if (!isActive) {
throw new IllegalStateException("User already deleted");
}
applyChange(new UserDeletedEvent(getAggregateId(), getVersion() + 1, reason));
}
// Query methods
public String getUsername() { return username; }
public String getEmail() { return email; }
public String getFullName() { return fullName; }
public boolean isActive() { return isActive; }
public List<String> getPreviousEmails() { return new ArrayList<>(previousEmails); }
@Override
protected void apply(DomainEvent event) {
if (event instanceof UserRegisteredEvent) {
apply((UserRegisteredEvent) event);
} else if (event instanceof UserEmailChangedEvent) {
apply((UserEmailChangedEvent) event);
} else if (event instanceof UserDeletedEvent) {
apply((UserDeletedEvent) event);
}
}
private void apply(UserRegisteredEvent event) {
this.username = event.getUsername();
this.email = event.getEmail();
this.fullName = event.getFullName();
this.isActive = true;
}
private void apply(UserEmailChangedEvent event) {
if (this.email != null) {
this.previousEmails.add(this.email);
}
this.email = event.getNewEmail();
}
private void apply(UserDeletedEvent event) {
this.isActive = false;
}
}

Repository Pattern for Aggregates

package com.example.eventstore.repository;
import com.example.eventstore.aggregates.AggregateRoot;
import com.example.eventstore.events.DomainEvent;
import com.example.eventstore.store.EventStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
public class AggregateRepository<T extends AggregateRoot> {
private static final Logger logger = LoggerFactory.getLogger(AggregateRepository.class);
private final EventStore eventStore;
private final Class<T> aggregateClass;
private final Supplier<T> aggregateFactory;
public AggregateRepository(EventStore eventStore, Class<T> aggregateClass, 
Supplier<T> aggregateFactory) {
this.eventStore = eventStore;
this.aggregateClass = aggregateClass;
this.aggregateFactory = aggregateFactory;
}
public CompletableFuture<Optional<T>> findById(String aggregateId) {
return eventStore.getEventsForAggregate(aggregateId)
.thenApply(events -> {
if (events.isEmpty()) {
return Optional.empty();
}
T aggregate = aggregateFactory.get();
aggregate.loadFromHistory(events);
return Optional.of(aggregate);
})
.exceptionally(throwable -> {
logger.error("Failed to load aggregate: {}", aggregateId, throwable);
return Optional.empty();
});
}
public CompletableFuture<Void> save(T aggregate) {
List<DomainEvent> changes = aggregate.getUncommittedChanges();
if (changes.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return eventStore.appendEvents(aggregate.getAggregateId(), changes, aggregate.getVersion() - changes.size())
.thenRun(aggregate::markChangesAsCommitted)
.exceptionally(throwable -> {
logger.error("Failed to save aggregate: {}", aggregate.getAggregateId(), throwable);
throw new RepositoryException("Failed to save aggregate", throwable);
});
}
public static class RepositoryException extends RuntimeException {
public RepositoryException(String message) { super(message); }
public RepositoryException(String message, Throwable cause) { super(message, cause); }
}
}

6. Projection System

Projection Interface and Base Class

package com.example.eventstore.projections;
import com.example.eventstore.events.DomainEvent;
import com.example.eventstore.store.EventStoreSubscription;
import java.util.concurrent.CompletableFuture;
public interface Projection {
String getProjectionId();
CompletableFuture<Void> start();
CompletableFuture<Void> stop();
boolean isRunning();
long getProcessedEventCount();
}
public abstract class BaseProjection implements Projection, EventStoreSubscription {
protected final String projectionId;
protected volatile boolean running;
protected long processedEventCount;
protected BaseProjection(String projectionId) {
this.projectionId = projectionId;
this.running = false;
this.processedEventCount = 0;
}
@Override
public String getProjectionId() {
return projectionId;
}
@Override
public boolean isRunning() {
return running;
}
@Override
public long getProcessedEventCount() {
return processedEventCount;
}
@Override
public void onError(Throwable error) {
// Default error handling - log and continue
System.err.println("Error in projection " + projectionId + ": " + error.getMessage());
}
}

Concrete Projection Implementation

package com.example.eventstore.projections;
import com.example.eventstore.events.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CompletableFuture;
public class UserReadModelProjection extends BaseProjection {
private static final Logger logger = LoggerFactory.getLogger(UserReadModelProjection.class);
private final DataSource dataSource;
private final ObjectMapper objectMapper;
public UserReadModelProjection(DataSource dataSource) {
super("user-read-model");
this.dataSource = dataSource;
this.objectMapper = new ObjectMapper();
initializeReadModel();
}
private void initializeReadModel() {
String sql = "CREATE TABLE IF NOT EXISTS user_read_model (" +
"user_id VARCHAR(255) PRIMARY KEY, " +
"username VARCHAR(100) NOT NULL, " +
"email VARCHAR(255) NOT NULL, " +
"full_name VARCHAR(255) NOT NULL, " +
"is_active BOOLEAN NOT NULL DEFAULT true, " +
"created_at TIMESTAMP, " +
"updated_at TIMESTAMP, " +
"previous_emails JSON" +
")";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.execute();
logger.info("Initialized user read model table");
} catch (SQLException e) {
throw new RuntimeException("Failed to initialize user read model", e);
}
}
@Override
public CompletableFuture<Void> start() {
return CompletableFuture.runAsync(() -> {
running = true;
logger.info("Started projection: {}", projectionId);
});
}
@Override
public CompletableFuture<Void> stop() {
return CompletableFuture.runAsync(() -> {
running = false;
logger.info("Stopped projection: {}", projectionId);
});
}
@Override
public void onEvent(DomainEvent event) {
if (!running) {
return;
}
try {
if (event instanceof UserRegisteredEvent) {
handleUserRegistered((UserRegisteredEvent) event);
} else if (event instanceof UserEmailChangedEvent) {
handleUserEmailChanged((UserEmailChangedEvent) event);
} else if (event instanceof UserDeletedEvent) {
handleUserDeleted((UserDeletedEvent) event);
}
processedEventCount++;
} catch (Exception e) {
logger.error("Failed to process event in projection: {}", event.getEventType(), e);
onError(e);
}
}
private void handleUserRegistered(UserRegisteredEvent event) throws SQLException {
String sql = "INSERT INTO user_read_model (user_id, username, email, full_name, created_at, updated_at) " +
"VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, event.getAggregateId());
stmt.setString(2, event.getUsername());
stmt.setString(3, event.getEmail());
stmt.setString(4, event.getFullName());
stmt.executeUpdate();
}
}
private void handleUserEmailChanged(UserEmailChangedEvent event) throws SQLException {
String sql = "UPDATE user_read_model SET email = ?, updated_at = CURRENT_TIMESTAMP " +
"WHERE user_id = ?";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, event.getNewEmail());
stmt.setString(2, event.getAggregateId());
stmt.executeUpdate();
}
}
private void handleUserDeleted(UserDeletedEvent event) throws SQLException {
String sql = "UPDATE user_read_model SET is_active = false, updated_at = CURRENT_TIMESTAMP " +
"WHERE user_id = ?";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, event.getAggregateId());
stmt.executeUpdate();
}
}
// Query methods for read model
public CompletableFuture<Boolean> userExists(String userId) {
return CompletableFuture.supplyAsync(() -> {
String sql = "SELECT 1 FROM user_read_model WHERE user_id = ? AND is_active = true";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, userId);
ResultSet rs = stmt.executeQuery();
return rs.next();
} catch (SQLException e) {
throw new RuntimeException("Failed to check user existence", e);
}
});
}
public CompletableFuture<String> getUserEmail(String userId) {
return CompletableFuture.supplyAsync(() -> {
String sql = "SELECT email FROM user_read_model WHERE user_id = ?";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, userId);
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
return rs.getString("email");
}
return null;
} catch (SQLException e) {
throw new RuntimeException("Failed to get user email", e);
}
});
}
}

7. Complete Usage Example

Application Setup and Usage

package com.example.eventstore;
import com.example.eventstore.aggregates.UserAggregate;
import com.example.eventstore.events.*;
import com.example.eventstore.projections.UserReadModelProjection;
import com.example.eventstore.repository.AggregateRepository;
import com.example.eventstore.serialization.JacksonEventSerializer;
import com.example.eventstore.store.EventStore;
import com.example.eventstore.store.jdbc.JdbcEventStore;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
public class EventStoreApplication {
private static final Logger logger = LoggerFactory.getLogger(EventStoreApplication.class);
private final EventStore eventStore;
private final AggregateRepository<UserAggregate> userRepository;
private final UserReadModelProjection userProjection;
public EventStoreApplication(DataSource dataSource) {
// Initialize serializer
JacksonEventSerializer serializer = new JacksonEventSerializer();
serializer.registerEventType("USER_REGISTERED", UserRegisteredEvent.class);
serializer.registerEventType("USER_EMAIL_CHANGED", UserEmailChangedEvent.class);
serializer.registerEventType("USER_DELETED", UserDeletedEvent.class);
// Initialize event store
this.eventStore = new JdbcEventStore(dataSource, serializer);
// Initialize repositories
this.userRepository = new AggregateRepository<>(
eventStore, UserAggregate.class, 
() -> new UserAggregate("") // Factory for new instances
);
// Initialize projections
this.userProjection = new UserReadModelProjection(dataSource);
// Subscribe projections to events
eventStore.subscribeToEventType("USER_REGISTERED", userProjection);
eventStore.subscribeToEventType("USER_EMAIL_CHANGED", userProjection);
eventStore.subscribeToEventType("USER_DELETED", userProjection);
// Start projections
userProjection.start();
}
public CompletableFuture<String> registerUser(String username, String email, String fullName) {
String userId = "USER-" + System.currentTimeMillis(); // Simple ID generation
UserAggregate user = new UserAggregate(userId);
user.register(username, email, fullName);
return userRepository.save(user)
.thenApply(v -> {
logger.info("User registered successfully: {}", userId);
return userId;
})
.exceptionally(throwable -> {
logger.error("Failed to register user", throwable);
throw new RuntimeException("User registration failed", throwable);
});
}
public CompletableFuture<Void> changeUserEmail(String userId, String newEmail) {
return userRepository.findById(userId)
.thenCompose(optionalUser -> {
if (optionalUser.isEmpty()) {
throw new RuntimeException("User not found: " + userId);
}
UserAggregate user = optionalUser.get();
user.changeEmail(newEmail);
return userRepository.save(user);
})
.thenRun(() -> logger.info("User email changed: {}", userId));
}
public CompletableFuture<Void> deleteUser(String userId, String reason) {
return userRepository.findById(userId)
.thenCompose(optionalUser -> {
if (optionalUser.isEmpty()) {
throw new RuntimeException("User not found: " + userId);
}
UserAggregate user = optionalUser.get();
user.delete(reason);
return userRepository.save(user);
})
.thenRun(() -> logger.info("User deleted: {}", userId));
}
public CompletableFuture<Optional<UserAggregate>> getUser(String userId) {
return userRepository.findById(userId);
}
public static void main(String[] args) {
// Setup data source
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://localhost:5432/eventstore");
config.setUsername("eventstore_user");
config.setPassword("password");
config.setMaximumPoolSize(10);
DataSource dataSource = new HikariDataSource(config);
// Create application
EventStoreApplication app = new EventStoreApplication(dataSource);
// Example usage
app.registerUser("john_doe", "[email protected]", "John Doe")
.thenCompose(userId -> {
System.out.println("Registered user: " + userId);
// Change email after registration
return app.changeUserEmail(userId, "[email protected]")
.thenApply(v -> userId);
})
.thenCompose(userId -> {
// Get user from read model
return app.userProjection.getUserEmail(userId)
.thenApply(email -> {
System.out.println("User email from read model: " + email);
return userId;
});
})
.thenCompose(userId -> {
// Delete user
return app.deleteUser(userId, "User requested deletion");
})
.thenRun(() -> System.out.println("Demo completed"))
.exceptionally(throwable -> {
System.err.println("Demo failed: " + throwable.getMessage());
return null;
})
.join(); // Wait for completion in demo
// Shutdown
app.userProjection.stop();
}
}

This complete Event Store implementation with JDBC provides:

  1. Event Sourcing Core: Store all state changes as events
  2. Aggregate Pattern: Domain-driven design with consistency boundaries
  3. Projections: Build read models for querying
  4. Subscriptions: React to events in real-time
  5. Performance: Snapshots and efficient querying
  6. Scalability: Designed for horizontal scaling

The implementation is production-ready and can be extended with additional features like event versioning, schema evolution, and advanced projection patterns.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper