Projection Rebuilding in Java

Overview

Projection rebuilding is the process of recreating read models (projections) from event streams or source data. This is crucial in CQRS and Event Sourcing architectures for maintaining consistent read models after schema changes, bug fixes, or system migrations.

Architecture

Projection Rebuilding Strategies

  1. Full Rebuild: Complete recreation from all events
  2. Incremental Rebuild: Update from specific point
  3. Parallel Rebuild: Build new projection while serving old
  4. Live Migration: Rebuild while system is operational

Dependencies

<dependencies>
<!-- Event Store -->
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-core</artifactId>
<version>4.8.0</version>
</dependency>
<!-- Spring Data -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- MongoDB for projections -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<!-- Redis for cache -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Metrics -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

Core Implementation

1. Projection Interface and Base Class

public interface Projection {
String getName();
long getVersion();
ProjectionStatus getStatus();
Instant getLastRebuildTime();
void rebuild(RebuildContext context);
boolean supportsIncremental();
}
public abstract class AbstractProjection implements Projection {
protected final Logger logger = LoggerFactory.getLogger(getClass());
protected final String name;
protected final long version;
protected volatile ProjectionStatus status = ProjectionStatus.ACTIVE;
protected volatile Instant lastRebuildTime;
protected final AtomicLong processedEvents = new AtomicLong();
protected final AtomicLong rebuildDuration = new AtomicLong();
protected AbstractProjection(String name, long version) {
this.name = name;
this.version = version;
}
@Override
public String getName() { return name; }
@Override
public long getVersion() { return version; }
@Override
public ProjectionStatus getStatus() { return status; }
@Override
public Instant getLastRebuildTime() { return lastRebuildTime; }
@Override
public boolean supportsIncremental() { return false; }
protected void updateStatus(ProjectionStatus newStatus) {
this.status = newStatus;
logger.info("Projection {} status changed to: {}", name, newStatus);
}
protected void recordMetrics(long startTime, long eventCount) {
long duration = System.currentTimeMillis() - startTime;
rebuildDuration.set(duration);
processedEvents.addAndGet(eventCount);
logger.info("Projection {} rebuild completed in {}ms, processed {} events", 
name, duration, eventCount);
}
}

2. Projection Rebuilder Service

@Service
public class ProjectionRebuilder {
private final Map<String, Projection> projections;
private final EventStore eventStore;
private final ProjectionRepository projectionRepository;
private final MeterRegistry meterRegistry;
private final ExecutorService rebuildExecutor;
private final Map<String, RebuildContext> activeRebuilds;
public ProjectionRebuilder(EventStore eventStore,
ProjectionRepository projectionRepository,
MeterRegistry meterRegistry,
List<Projection> projectionBeans) {
this.eventStore = eventStore;
this.projectionRepository = projectionRepository;
this.meterRegistry = meterRegistry;
this.projections = projectionBeans.stream()
.collect(Collectors.toMap(Projection::getName, Function.identity()));
this.rebuildExecutor = Executors.newFixedThreadPool(5);
this.activeRebuilds = new ConcurrentHashMap<>();
}
public RebuildResult rebuildProjection(String projectionName, RebuildOptions options) {
Projection projection = projections.get(projectionName);
if (projection == null) {
throw new ProjectionNotFoundException("Projection not found: " + projectionName);
}
if (activeRebuilds.containsKey(projectionName)) {
throw new RebuildInProgressException("Rebuild already in progress for: " + projectionName);
}
return startRebuild(projection, options);
}
public RebuildResult rebuildAll(RebuildOptions options) {
List<RebuildResult> results = projections.values().stream()
.map(projection -> startRebuild(projection, options))
.collect(Collectors.toList());
return RebuildResult.aggregate(results);
}
private RebuildResult startRebuild(Projection projection, RebuildOptions options) {
RebuildContext context = createRebuildContext(projection, options);
activeRebuilds.put(projection.getName(), context);
CompletableFuture<RebuildResult> future = CompletableFuture.supplyAsync(() -> {
try {
return executeRebuild(projection, context);
} finally {
activeRebuilds.remove(projection.getName());
}
}, rebuildExecutor);
context.setFuture(future);
return RebuildResult.started(projection.getName(), context.getRebuildId());
}
private RebuildResult executeRebuild(Projection projection, RebuildContext context) {
long startTime = System.currentTimeMillis();
projection.updateStatus(ProjectionStatus.REBUILDING);
try {
// Create backup if needed
if (context.getOptions().isBackupEnabled()) {
createBackup(projection);
}
// Execute rebuild
projection.rebuild(context);
// Update metadata
updateProjectionMetadata(projection, context);
// Record success
recordRebuildMetrics(projection, startTime, context.getProcessedEvents(), true);
return RebuildResult.completed(
projection.getName(), 
context.getRebuildId(),
context.getProcessedEvents(),
System.currentTimeMillis() - startTime
);
} catch (Exception e) {
// Handle failure
handleRebuildFailure(projection, context, e);
recordRebuildMetrics(projection, startTime, context.getProcessedEvents(), false);
return RebuildResult.failed(
projection.getName(),
context.getRebuildId(),
e.getMessage(),
System.currentTimeMillis() - startTime
);
} finally {
projection.updateStatus(ProjectionStatus.ACTIVE);
}
}
private RebuildContext createRebuildContext(Projection projection, RebuildOptions options) {
return RebuildContext.builder()
.rebuildId(UUID.randomUUID().toString())
.projectionName(projection.getName())
.options(options)
.startTime(Instant.now())
.build();
}
private void createBackup(Projection projection) {
try {
projectionRepository.createBackup(projection.getName());
logger.info("Created backup for projection: {}", projection.getName());
} catch (Exception e) {
logger.warn("Failed to create backup for projection: {}", projection.getName(), e);
}
}
private void updateProjectionMetadata(Projection projection, RebuildContext context) {
ProjectionMetadata metadata = new ProjectionMetadata(
projection.getName(),
projection.getVersion(),
Instant.now(),
context.getProcessedEvents(),
context.getOptions().getRebuildType()
);
projectionRepository.saveMetadata(metadata);
}
private void recordRebuildMetrics(Projection projection, long startTime, 
long eventCount, boolean success) {
long duration = System.currentTimeMillis() - startTime;
meterRegistry.timer("projection.rebuild.duration", 
"projection", projection.getName())
.record(Duration.ofMillis(duration));
meterRegistry.counter("projection.rebuild.events",
"projection", projection.getName())
.increment(eventCount);
meterRegistry.counter("projection.rebuild.outcome",
"projection", projection.getName(),
"success", String.valueOf(success))
.increment();
}
private void handleRebuildFailure(Projection projection, RebuildContext context, Exception e) {
logger.error("Rebuild failed for projection: {}", projection.getName(), e);
// Restore from backup if available
if (context.getOptions().isBackupEnabled()) {
try {
projectionRepository.restoreBackup(projection.getName());
logger.info("Restored projection from backup: {}", projection.getName());
} catch (Exception restoreException) {
logger.error("Failed to restore projection from backup: {}", 
projection.getName(), restoreException);
}
}
}
public RebuildStatus getRebuildStatus(String projectionName) {
RebuildContext context = activeRebuilds.get(projectionName);
if (context != null) {
return RebuildStatus.inProgress(
context.getRebuildId(),
context.getProcessedEvents(),
context.getStartTime()
);
}
ProjectionMetadata metadata = projectionRepository.getMetadata(projectionName);
return RebuildStatus.completed(metadata);
}
public void cancelRebuild(String projectionName) {
RebuildContext context = activeRebuilds.get(projectionName);
if (context != null && context.getFuture() != null) {
context.getFuture().cancel(true);
activeRebuilds.remove(projectionName);
}
}
}

3. Event-Based Projection Implementation

@Component
public class OrderSummaryProjection extends AbstractProjection {
private final OrderSummaryRepository repository;
private final EventStore eventStore;
public OrderSummaryProjection(OrderSummaryRepository repository,
EventStore eventStore) {
super("order-summary", 2L);
this.repository = repository;
this.eventStore = eventStore;
}
@Override
public void rebuild(RebuildContext context) {
long startTime = System.currentTimeMillis();
long eventCount = 0;
try {
// Clear existing data
if (context.getOptions().getRebuildType() == RebuildType.FULL) {
repository.deleteAll();
}
// Rebuild from events
eventCount = processEvents(context);
// Update any derived data
updateDerivedData();
} catch (Exception e) {
throw new ProjectionRebuildException("Failed to rebuild order summary projection", e);
} finally {
recordMetrics(startTime, eventCount);
}
}
@Override
public boolean supportsIncremental() {
return true;
}
private long processEvents(RebuildContext context) {
AtomicLong eventCount = new AtomicLong();
eventStore.readAllEvents(context.getOptions().getFromSequence())
.takeWhile(event -> shouldContinueProcessing(context))
.forEach(event -> {
try {
handleEvent(event, context);
eventCount.incrementAndGet();
context.incrementProcessedEvents();
// Progress reporting
if (eventCount.get() % 1000 == 0) {
logger.info("Processed {} events for projection {}", 
eventCount.get(), getName());
}
} catch (Exception e) {
if (context.getOptions().isFailOnError()) {
throw new ProjectionRebuildException(
"Error processing event: " + event.getSequence(), e);
} else {
logger.warn("Skipped event {} due to error: {}", 
event.getSequence(), e.getMessage());
}
}
});
return eventCount.get();
}
private void handleEvent(DomainEvent event, RebuildContext context) {
switch (event.getType()) {
case "OrderCreated":
handleOrderCreated((OrderCreated) event);
break;
case "OrderCancelled":
handleOrderCancelled((OrderCancelled) event);
break;
case "OrderShipped":
handleOrderShipped((OrderShipped) event);
break;
case "OrderDelivered":
handleOrderDelivered((OrderDelivered) event);
break;
default:
// Ignore unknown event types
break;
}
}
private void handleOrderCreated(OrderCreated event) {
OrderSummary summary = new OrderSummary();
summary.setOrderId(event.getOrderId());
summary.setCustomerId(event.getCustomerId());
summary.setAmount(event.getAmount());
summary.setStatus("CREATED");
summary.setCreatedAt(event.getTimestamp());
summary.setUpdatedAt(event.getTimestamp());
repository.save(summary);
}
private void handleOrderCancelled(OrderCancelled event) {
repository.findById(event.getOrderId()).ifPresent(summary -> {
summary.setStatus("CANCELLED");
summary.setUpdatedAt(event.getTimestamp());
repository.save(summary);
});
}
private void handleOrderShipped(OrderShipped event) {
repository.findById(event.getOrderId()).ifPresent(summary -> {
summary.setStatus("SHIPPED");
summary.setShippingDate(event.getTimestamp());
summary.setUpdatedAt(event.getTimestamp());
repository.save(summary);
});
}
private void handleOrderDelivered(OrderDelivered event) {
repository.findById(event.getOrderId()).ifPresent(summary -> {
summary.setStatus("DELIVERED");
summary.setDeliveryDate(event.getTimestamp());
summary.setUpdatedAt(event.getTimestamp());
repository.save(summary);
});
}
private void updateDerivedData() {
// Update any aggregated data
updateCustomerStatistics();
updateProductStatistics();
}
private void updateCustomerStatistics() {
// Implementation for updating customer stats
}
private void updateProductStatistics() {
// Implementation for updating product stats
}
private boolean shouldContinueProcessing(RebuildContext context) {
return context.getOptions().getMaxEvents() <= 0 || 
context.getProcessedEvents() < context.getOptions().getMaxEvents();
}
}

Advanced Rebuilding Strategies

1. Parallel Rebuild with Switchover

@Service
public class ParallelProjectionRebuilder {
private final ProjectionRebuilder rebuilder;
private final ProjectionRouter projectionRouter;
private final Map<String, String> activeVersions;
public ParallelProjectionRebuilder(ProjectionRebuilder rebuilder,
ProjectionRouter projectionRouter) {
this.rebuilder = rebuilder;
this.projectionRouter = projectionRouter;
this.activeVersions = new ConcurrentHashMap<>();
}
public ParallelRebuildResult rebuildInParallel(String projectionName, 
RebuildOptions options) {
String newVersion = generateVersionId();
String tempProjectionName = projectionName + "-" + newVersion;
// Create temporary projection instance
Projection tempProjection = createTemporaryProjection(projectionName, tempProjectionName);
// Register temporary projection for routing
projectionRouter.registerVersion(projectionName, newVersion, tempProjectionName);
// Start rebuild to temporary projection
RebuildResult result = rebuilder.rebuildProjection(tempProjectionName, options);
return new ParallelRebuildResult(
projectionName,
newVersion,
tempProjectionName,
result
);
}
public void switchToVersion(String projectionName, String version) {
projectionRouter.setActiveVersion(projectionName, version);
activeVersions.put(projectionName, version);
// Clean up old versions
cleanupOldVersions(projectionName, version);
}
public void cancelParallelRebuild(String projectionName, String version) {
projectionRouter.unregisterVersion(projectionName, version);
rebuilder.cancelRebuild(projectionName + "-" + version);
}
private Projection createTemporaryProjection(String originalName, String tempName) {
// Clone or create temporary projection with different storage
// This would typically use a different database/schema
return new TemporaryProjection(originalName, tempName);
}
private String generateVersionId() {
return Instant.now().toString().replaceAll("[:.-]", "").toLowerCase();
}
private void cleanupOldVersions(String projectionName, String currentVersion) {
projectionRouter.getAllVersions(projectionName).stream()
.filter(version -> !version.equals(currentVersion))
.forEach(version -> {
projectionRouter.unregisterVersion(projectionName, version);
// Schedule deletion of old projection data
scheduleProjectionCleanup(projectionName + "-" + version);
});
}
private void scheduleProjectionCleanup(String projectionName) {
// Schedule cleanup of old projection data
}
}

2. Incremental Rebuild Service

@Service
public class IncrementalRebuildService {
private final EventStore eventStore;
private final ProjectionMetadataRepository metadataRepository;
private final Map<String, Projection> projections;
public IncrementalRebuildService(EventStore eventStore,
ProjectionMetadataRepository metadataRepository,
List<Projection> projectionBeans) {
this.eventStore = eventStore;
this.metadataRepository = metadataRepository;
this.projections = projectionBeans.stream()
.collect(Collectors.toMap(Projection::getName, Function.identity()));
}
@Scheduled(fixedDelay = 30000) // Every 30 seconds
public void performIncrementalRebuilds() {
projections.values().stream()
.filter(Projection::supportsIncremental)
.forEach(this::rebuildIncrementally);
}
private void rebuildIncrementally(Projection projection) {
ProjectionMetadata metadata = metadataRepository.findByName(projection.getName());
long lastProcessedSequence = metadata.getLastProcessedSequence();
try {
long newSequence = processNewEvents(projection, lastProcessedSequence);
if (newSequence > lastProcessedSequence) {
metadata.setLastProcessedSequence(newSequence);
metadata.setLastUpdated(Instant.now());
metadataRepository.save(metadata);
logger.debug("Incremental rebuild completed for {} up to sequence {}", 
projection.getName(), newSequence);
}
} catch (Exception e) {
logger.error("Incremental rebuild failed for {}", projection.getName(), e);
}
}
private long processNewEvents(Projection projection, long lastProcessedSequence) {
AtomicLong latestSequence = new AtomicLong(lastProcessedSequence);
eventStore.readEventsAfter(lastProcessedSequence)
.forEach(event -> {
try {
handleEventForProjection(projection, event);
latestSequence.set(event.getSequence());
} catch (Exception e) {
logger.warn("Failed to process event {} for projection {}", 
event.getSequence(), projection.getName(), e);
}
});
return latestSequence.get();
}
private void handleEventForProjection(Projection projection, DomainEvent event) {
// This would need reflection or a registry of event handlers
if (projection instanceof EventHandler) {
((EventHandler) projection).handleEvent(event);
}
}
public void triggerIncrementalRebuild(String projectionName) {
Projection projection = projections.get(projectionName);
if (projection != null && projection.supportsIncremental()) {
rebuildIncrementally(projection);
}
}
}

Monitoring and Management

1. Projection Rebuild Controller

@RestController
@RequestMapping("/api/projections")
public class ProjectionRebuildController {
private final ProjectionRebuilder rebuilder;
private final ParallelProjectionRebuilder parallelRebuilder;
private final IncrementalRebuildService incrementalService;
public ProjectionRebuildController(ProjectionRebuilder rebuilder,
ParallelProjectionRebuilder parallelRebuilder,
IncrementalRebuildService incrementalService) {
this.rebuilder = rebuilder;
this.parallelRebuilder = parallelRebuilder;
this.incrementalService = incrementalService;
}
@PostMapping("/{projectionName}/rebuild")
public ResponseEntity<RebuildResult> rebuildProjection(
@PathVariable String projectionName,
@RequestBody RebuildRequest request) {
RebuildOptions options = RebuildOptions.builder()
.rebuildType(request.getRebuildType())
.fromSequence(request.getFromSequence())
.maxEvents(request.getMaxEvents())
.backupEnabled(request.isBackupEnabled())
.failOnError(request.isFailOnError())
.build();
RebuildResult result = rebuilder.rebuildProjection(projectionName, options);
return ResponseEntity.accepted().body(result);
}
@PostMapping("/{projectionName}/rebuild-parallel")
public ResponseEntity<ParallelRebuildResult> rebuildParallel(
@PathVariable String projectionName,
@RequestBody RebuildRequest request) {
RebuildOptions options = RebuildOptions.builder()
.rebuildType(request.getRebuildType())
.fromSequence(request.getFromSequence())
.maxEvents(request.getMaxEvents())
.backupEnabled(request.isBackupEnabled())
.failOnError(request.isFailOnError())
.build();
ParallelRebuildResult result = parallelRebuilder.rebuildInParallel(projectionName, options);
return ResponseEntity.accepted().body(result);
}
@PostMapping("/{projectionName}/switch-version/{version}")
public ResponseEntity<Void> switchVersion(
@PathVariable String projectionName,
@PathVariable String version) {
parallelRebuilder.switchToVersion(projectionName, version);
return ResponseEntity.ok().build();
}
@GetMapping("/{projectionName}/status")
public ResponseEntity<RebuildStatus> getRebuildStatus(
@PathVariable String projectionName) {
RebuildStatus status = rebuilder.getRebuildStatus(projectionName);
return ResponseEntity.ok(status);
}
@PostMapping("/{projectionName}/cancel")
public ResponseEntity<Void> cancelRebuild(@PathVariable String projectionName) {
rebuilder.cancelRebuild(projectionName);
return ResponseEntity.ok().build();
}
@PostMapping("/{projectionName}/incremental")
public ResponseEntity<Void> triggerIncremental(@PathVariable String projectionName) {
incrementalService.triggerIncrementalRebuild(projectionName);
return ResponseEntity.accepted().build();
}
@PostMapping("/rebuild-all")
public ResponseEntity<RebuildResult> rebuildAll(@RequestBody RebuildRequest request) {
RebuildOptions options = RebuildOptions.builder()
.rebuildType(request.getRebuildType())
.fromSequence(request.getFromSequence())
.maxEvents(request.getMaxEvents())
.backupEnabled(request.isBackupEnabled())
.failOnError(request.isFailOnError())
.build();
RebuildResult result = rebuilder.rebuildAll(options);
return ResponseEntity.accepted().body(result);
}
}

2. Projection Health Check

@Component
public class ProjectionHealthIndicator implements HealthIndicator {
private final ProjectionRebuilder rebuilder;
private final ProjectionMetadataRepository metadataRepository;
public ProjectionHealthIndicator(ProjectionRebuilder rebuilder,
ProjectionMetadataRepository metadataRepository) {
this.rebuilder = rebuilder;
this.metadataRepository = metadataRepository;
}
@Override
public Health health() {
Map<String, Object> details = new HashMap<>();
boolean healthy = true;
try {
List<ProjectionMetadata> allMetadata = metadataRepository.findAll();
for (ProjectionMetadata metadata : allMetadata) {
RebuildStatus status = rebuilder.getRebuildStatus(metadata.getName());
details.put(metadata.getName(), Map.of(
"version", metadata.getVersion(),
"lastRebuild", metadata.getLastRebuildTime(),
"status", status.getStatus(),
"eventsProcessed", metadata.getEventsProcessed()
));
if (status.getStatus() == RebuildStatus.Status.FAILED) {
healthy = false;
}
// Check if projection is stale (no updates in 24 hours)
if (isProjectionStale(metadata)) {
details.put(metadata.getName() + ".stale", true);
healthy = false;
}
}
} catch (Exception e) {
return Health.down(e).build();
}
return healthy ? Health.up().withDetails(details).build() :
Health.down().withDetails(details).build();
}
private boolean isProjectionStale(ProjectionMetadata metadata) {
return metadata.getLastRebuildTime()
.isBefore(Instant.now().minus(24, ChronoUnit.HOURS));
}
}

Testing

1. Projection Rebuild Test

@SpringBootTest
class ProjectionRebuildTest {
@Autowired
private ProjectionRebuilder rebuilder;
@Autowired
private OrderSummaryRepository repository;
@Autowired
private EventStore eventStore;
@Test
void testFullRebuild() {
// Given
RebuildOptions options = RebuildOptions.builder()
.rebuildType(RebuildType.FULL)
.backupEnabled(true)
.build();
// When
RebuildResult result = rebuilder.rebuildProjection("order-summary", options);
// Then
assertEquals(RebuildResult.Status.COMPLETED, result.getStatus());
assertTrue(result.getProcessedEvents() > 0);
// Verify projection data
List<OrderSummary> summaries = repository.findAll();
assertFalse(summaries.isEmpty());
}
@Test
void testRebuildWithEventFiltering() {
// Given
long specificSequence = 1000L;
RebuildOptions options = RebuildOptions.builder()
.rebuildType(RebuildType.FULL)
.fromSequence(specificSequence)
.maxEvents(500)
.build();
// When
RebuildResult result = rebuilder.rebuildProjection("order-summary", options);
// Then
assertEquals(RebuildResult.Status.COMPLETED, result.getStatus());
assertTrue(result.getProcessedEvents() <= 500);
}
@Test
void testRebuildFailure() {
// Given - corrupt event data
RebuildOptions options = RebuildOptions.builder()
.rebuildType(RebuildType.FULL)
.failOnError(true)
.build();
// When/Then
assertThrows(ProjectionRebuildException.class, () -> {
rebuilder.rebuildProjection("corrupted-projection", options);
});
}
}

Configuration

application.yml

projection:
rebuild:
default-options:
backup-enabled: true
fail-on-error: false
max-events: 0 # 0 means no limit
parallel:
enabled: true
max-concurrent: 3
incremental:
enabled: true
interval: 30000 # 30 seconds
monitoring:
health-check-enabled: true
metrics-enabled: true
events:
store:
type: jpa
batch-size: 1000
read-timeout: 30000
spring:
data:
mongodb:
database: projections
jpa:
show-sql: false

Best Practices

  1. Always Backup: Enable backups before major rebuilds
  2. Monitor Progress: Track rebuild progress and metrics
  3. Handle Failures Gracefully: Implement proper error handling and recovery
  4. Test Thoroughly: Test rebuild scenarios with various data sizes
  5. Use Parallel Rebuilds: For large datasets, use parallel rebuilds to minimize downtime
  6. Implement Incremental Updates: For frequently changing data
  7. Version Projections: Maintain version history for rollback capability
  8. Monitor Performance: Track rebuild duration and resource usage

This implementation provides a comprehensive foundation for projection rebuilding in Java, supporting various rebuild strategies, monitoring, and management capabilities suitable for production systems.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper