Outbox Pattern: Building Reliable Distributed Systems in Java

The Outbox Pattern is a critical architectural pattern for ensuring reliable message delivery and data consistency in distributed systems. It solves the dual-write problem by treating message publishing as part of the database transaction.

Understanding the Problem: Dual-Write Issue

The Problem Scenario

@Service
@Transactional
public class OrderService {
public void createOrder(Order order) {
// 1. Save order to database
orderRepository.save(order);
// 2. Publish order created event
messageBroker.publish(new OrderCreatedEvent(order));
// What if this fails? Database committed but message lost!
}
}

Core Concepts

How the Outbox Pattern Works

  1. Atomic Write: Save business data AND outbox message in the same transaction
  2. Background Processing: Separate process reads outbox and publishes messages
  3. Reliable Delivery: Guaranteed at-least-once delivery semantics
  4. Idempotency: Handle duplicate messages safely

Implementation Approaches

Approach 1: Database Outbox Table

Database Schema

CREATE TABLE outbox_events (
id BIGSERIAL PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMP NULL,
status VARCHAR(50) NOT NULL DEFAULT 'PENDING',
retry_count INTEGER NOT NULL DEFAULT 0,
last_error TEXT NULL
);
CREATE INDEX idx_outbox_events_pending ON outbox_events (status, created_at) 
WHERE status = 'PENDING';
CREATE INDEX idx_outbox_events_aggregate ON outbox_events (aggregate_type, aggregate_id);

Entity Classes

@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "aggregate_type", nullable = false)
private String aggregateType;
@Column(name = "aggregate_id", nullable = false)
private String aggregateId;
@Column(name = "event_type", nullable = false)
private String eventType;
@Column(name = "event_data", columnDefinition = "JSONB", nullable = false)
private String eventData;
@Column(name = "created_at", nullable = false)
private LocalDateTime createdAt;
@Column(name = "processed_at")
private LocalDateTime processedAt;
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false)
private OutboxStatus status = OutboxStatus.PENDING;
@Column(name = "retry_count", nullable = false)
private Integer retryCount = 0;
@Column(name = "last_error")
private String lastError;
// constructors, getters, setters
}
enum OutboxStatus {
PENDING, PROCESSING, COMPLETED, FAILED
}
@Entity
@Table(name = "orders")
public class Order {
@Id
private String id;
private String customerId;
private BigDecimal amount;
private OrderStatus status;
private LocalDateTime createdAt;
// constructors, getters, setters
}

Example 1: Basic Outbox Implementation

@Service
@Transactional
public class OrderServiceWithOutbox {
private final OrderRepository orderRepository;
private final OutboxEventRepository outboxEventRepository;
private final ObjectMapper objectMapper;
public OrderServiceWithOutbox(OrderRepository orderRepository,
OutboxEventRepository outboxEventRepository,
ObjectMapper objectMapper) {
this.orderRepository = orderRepository;
this.outboxEventRepository = outboxEventRepository;
this.objectMapper = objectMapper;
}
public void createOrder(CreateOrderCommand command) {
// Create order entity
Order order = new Order();
order.setId(UUID.randomUUID().toString());
order.setCustomerId(command.getCustomerId());
order.setAmount(command.getAmount());
order.setStatus(OrderStatus.CREATED);
order.setCreatedAt(LocalDateTime.now());
// Save order
orderRepository.save(order);
// Create outbox event
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getCustomerId(),
order.getAmount(),
order.getCreatedAt()
);
OutboxEvent outboxEvent = createOutboxEvent(
"Order",
order.getId(),
"OrderCreated",
event
);
// Save outbox event in the same transaction
outboxEventRepository.save(outboxEvent);
// Transaction commits both order and outbox event atomically
}
private OutboxEvent createOutboxEvent(String aggregateType, 
String aggregateId,
String eventType,
Object event) {
try {
String eventData = objectMapper.writeValueAsString(event);
OutboxEvent outboxEvent = new OutboxEvent();
outboxEvent.setAggregateType(aggregateType);
outboxEvent.setAggregateId(aggregateId);
outboxEvent.setEventType(eventType);
outboxEvent.setEventData(eventData);
outboxEvent.setCreatedAt(LocalDateTime.now());
outboxEvent.setStatus(OutboxStatus.PENDING);
return outboxEvent;
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize event", e);
}
}
}
// Event classes
public class OrderCreatedEvent {
private final String orderId;
private final String customerId;
private final BigDecimal amount;
private final LocalDateTime createdAt;
// constructor, getters
}
public class OrderCancelledEvent {
private final String orderId;
private final String reason;
private final LocalDateTime cancelledAt;
// constructor, getters
}

Example 2: Outbox Event Processor

@Component
@Slf4j
public class OutboxEventProcessor {
private final OutboxEventRepository outboxEventRepository;
private final MessageBroker messageBroker;
private final ObjectMapper objectMapper;
private final TransactionTemplate transactionTemplate;
private final int batchSize = 100;
private final Duration pollInterval = Duration.ofSeconds(5);
private final int maxRetries = 3;
public OutboxEventProcessor(OutboxEventRepository outboxEventRepository,
MessageBroker messageBroker,
ObjectMapper objectMapper,
PlatformTransactionManager transactionManager) {
this.outboxEventRepository = outboxEventRepository;
this.messageBroker = messageBroker;
this.objectMapper = objectMapper;
this.transactionTemplate = new TransactionTemplate(transactionManager);
}
@Scheduled(fixedDelayString = "${outbox.processor.interval:5000}")
public void processOutboxEvents() {
log.info("Starting outbox event processing");
List<OutboxEvent> pendingEvents = fetchPendingEvents();
for (OutboxEvent event : pendingEvents) {
processEventWithRetry(event);
}
log.info("Completed processing {} events", pendingEvents.size());
}
private List<OutboxEvent> fetchPendingEvents() {
return transactionTemplate.execute(status -> {
// Lock and fetch pending events
List<OutboxEvent> events = outboxEventRepository
.findTop100ByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);
// Mark as processing
events.forEach(event -> {
event.setStatus(OutboxStatus.PROCESSING);
event.setProcessedAt(LocalDateTime.now());
});
outboxEventRepository.saveAll(events);
return events;
});
}
private void processEventWithRetry(OutboxEvent event) {
try {
processSingleEvent(event);
markEventAsCompleted(event);
} catch (Exception e) {
handleEventFailure(event, e);
}
}
private void processSingleEvent(OutboxEvent event) {
Object eventPayload = deserializeEvent(event);
// Publish to message broker
messageBroker.publish(
event.getEventType(),
event.getAggregateId(),
eventPayload
);
log.info("Successfully published event: {} for aggregate: {}", 
event.getEventType(), event.getAggregateId());
}
private Object deserializeEvent(OutboxEvent event) {
try {
Class<?> eventClass = getEventClass(event.getEventType());
return objectMapper.readValue(event.getEventData(), eventClass);
} catch (Exception e) {
throw new EventDeserializationException(
"Failed to deserialize event: " + event.getEventType(), e);
}
}
private Class<?> getEventClass(String eventType) {
// Map event type to class - could use a registry
return switch (eventType) {
case "OrderCreated" -> OrderCreatedEvent.class;
case "OrderCancelled" -> OrderCancelledEvent.class;
case "OrderShipped" -> OrderShippedEvent.class;
default -> throw new IllegalArgumentException("Unknown event type: " + eventType);
};
}
private void markEventAsCompleted(OutboxEvent event) {
transactionTemplate.execute(status -> {
event.setStatus(OutboxStatus.COMPLETED);
event.setProcessedAt(LocalDateTime.now());
return outboxEventRepository.save(event);
});
}
private void handleEventFailure(OutboxEvent event, Exception error) {
log.error("Failed to process outbox event: {}", event.getId(), error);
transactionTemplate.execute(status -> {
event.setRetryCount(event.getRetryCount() + 1);
event.setLastError(error.getMessage());
if (event.getRetryCount() >= maxRetries) {
event.setStatus(OutboxStatus.FAILED);
log.error("Event {} failed after {} retries", event.getId(), maxRetries);
} else {
event.setStatus(OutboxStatus.PENDING); // Retry later
}
return outboxEventRepository.save(event);
});
}
}
// Custom exceptions
class EventDeserializationException extends RuntimeException {
public EventDeserializationException(String message, Throwable cause) {
super(message, cause);
}
}

Example 3: Advanced Outbox with Domain Events

// Domain Event Interface
public interface DomainEvent {
String getAggregateId();
String getEventType();
LocalDateTime getOccurredAt();
}
// Abstract Aggregate Root
public abstract class AggregateRoot {
private final List<DomainEvent> domainEvents = new ArrayList<>();
protected void registerEvent(DomainEvent event) {
domainEvents.add(event);
}
public List<DomainEvent> getDomainEvents() {
return new ArrayList<>(domainEvents);
}
public void clearEvents() {
domainEvents.clear();
}
}
// Order Aggregate
public class Order extends AggregateRoot {
private String id;
private String customerId;
private BigDecimal amount;
private OrderStatus status;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
public static Order create(String customerId, BigDecimal amount) {
Order order = new Order();
order.id = UUID.randomUUID().toString();
order.customerId = customerId;
order.amount = amount;
order.status = OrderStatus.CREATED;
order.createdAt = LocalDateTime.now();
order.updatedAt = LocalDateTime.now();
order.registerEvent(new OrderCreatedEvent(order.id, customerId, amount));
return order;
}
public void cancel(String reason) {
if (this.status != OrderStatus.CREATED) {
throw new IllegalStateException("Only CREATED orders can be cancelled");
}
this.status = OrderStatus.CANCELLED;
this.updatedAt = LocalDateTime.now();
registerEvent(new OrderCancelledEvent(this.id, reason));
}
public void ship() {
if (this.status != OrderStatus.CREATED) {
throw new IllegalStateException("Only CREATED orders can be shipped");
}
this.status = OrderStatus.SHIPPED;
this.updatedAt = LocalDateTime.now();
registerEvent(new OrderShippedEvent(this.id, LocalDateTime.now()));
}
// getters
}
// Domain Service using Outbox
@Service
@Transactional
public class OrderDomainService {
private final OrderRepository orderRepository;
private final OutboxEventRepository outboxEventRepository;
private final ObjectMapper objectMapper;
private final DomainEventRegistry eventRegistry;
public OrderDomainService(OrderRepository orderRepository,
OutboxEventRepository outboxEventRepository,
ObjectMapper objectMapper,
DomainEventRegistry eventRegistry) {
this.orderRepository = orderRepository;
this.outboxEventRepository = outboxEventRepository;
this.objectMapper = objectMapper;
this.eventRegistry = eventRegistry;
}
public String createOrder(CreateOrderCommand command) {
Order order = Order.create(command.getCustomerId(), command.getAmount());
// Save order
orderRepository.save(order);
// Save domain events to outbox
saveDomainEventsToOutbox(order);
return order.getId();
}
public void cancelOrder(String orderId, String reason) {
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
order.cancel(reason);
// Update order
orderRepository.save(order);
// Save domain events to outbox
saveDomainEventsToOutbox(order);
}
private void saveDomainEventsToOutbox(Order order) {
List<OutboxEvent> outboxEvents = order.getDomainEvents().stream()
.map(this::convertToOutboxEvent)
.collect(Collectors.toList());
outboxEventRepository.saveAll(outboxEvents);
order.clearEvents();
}
private OutboxEvent convertToOutboxEvent(DomainEvent domainEvent) {
try {
String eventData = objectMapper.writeValueAsString(domainEvent);
OutboxEvent outboxEvent = new OutboxEvent();
outboxEvent.setAggregateType("Order");
outboxEvent.setAggregateId(domainEvent.getAggregateId());
outboxEvent.setEventType(domainEvent.getEventType());
outboxEvent.setEventData(eventData);
outboxEvent.setCreatedAt(LocalDateTime.now());
outboxEvent.setStatus(OutboxStatus.PENDING);
return outboxEvent;
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize domain event", e);
}
}
}
// Event Registry for type mapping
@Component
public class DomainEventRegistry {
private final Map<String, Class<? extends DomainEvent>> eventTypes = new HashMap<>();
public DomainEventRegistry() {
registerEventType("OrderCreated", OrderCreatedEvent.class);
registerEventType("OrderCancelled", OrderCancelledEvent.class);
registerEventType("OrderShipped", OrderShippedEvent.class);
}
public void registerEventType(String eventType, Class<? extends DomainEvent> eventClass) {
eventTypes.put(eventType, eventClass);
}
public Class<? extends DomainEvent> getEventClass(String eventType) {
return eventTypes.get(eventType);
}
}

Example 4: Transactional Outbox with Spring Integration

// Configuration
@Configuration
@EnableScheduling
@EnableTransactionManagement
public class OutboxConfiguration {
@Bean
public TaskScheduler outboxTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(2);
scheduler.setThreadNamePrefix("outbox-processor-");
scheduler.initialize();
return scheduler;
}
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return mapper;
}
}
// Repository with custom queries
@Repository
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, Long> {
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT e FROM OutboxEvent e WHERE e.status = 'PENDING' ORDER BY e.createdAt ASC")
List<OutboxEvent> findTop100ByStatusOrderByCreatedAtAsc(@Param("status") OutboxStatus status);
@Query("SELECT e FROM OutboxEvent e WHERE e.aggregateType = :aggregateType AND e.aggregateId = :aggregateId")
List<OutboxEvent> findByAggregate(@Param("aggregateType") String aggregateType, 
@Param("aggregateId") String aggregateId);
@Modifying
@Query("DELETE FROM OutboxEvent e WHERE e.status = 'COMPLETED' AND e.processedAt < :cutoff")
int deleteCompletedEventsOlderThan(@Param("cutoff") LocalDateTime cutoff);
}
// Message Broker Abstraction
public interface MessageBroker {
void publish(String eventType, String aggregateId, Object payload);
}
@Component
public class KafkaMessageBroker implements MessageBroker {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final String topic;
public KafkaMessageBroker(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
this.topic = "domain-events";
}
@Override
public void publish(String eventType, String aggregateId, Object payload) {
MessageHeaders headers = new MessageHeaders(Map.of(
"eventType", eventType,
"aggregateId", aggregateId,
"timestamp", System.currentTimeMillis()
));
Message<Object> message = MessageBuilder.createMessage(payload, headers);
kafkaTemplate.send(topic, aggregateId, message)
.addCallback(
result -> log.debug("Successfully published event: {}", eventType),
failure -> { throw new MessagePublishException("Failed to publish event", failure); }
);
}
}
// Health Check for Outbox
@Component
public class OutboxHealthIndicator implements HealthIndicator {
private final OutboxEventRepository outboxEventRepository;
public OutboxHealthIndicator(OutboxEventRepository outboxEventRepository) {
this.outboxEventRepository = outboxEventRepository;
}
@Override
public Health health() {
try {
long pendingCount = outboxEventRepository.countByStatus(OutboxStatus.PENDING);
long failedCount = outboxEventRepository.countByStatus(OutboxStatus.FAILED);
Health.Builder status = pendingCount > 1000 || failedCount > 0 ? 
Health.down() : Health.up();
return status
.withDetail("pending_events", pendingCount)
.withDetail("failed_events", failedCount)
.build();
} catch (Exception e) {
return Health.down(e).build();
}
}
}

Example 5: Testing the Outbox Pattern

@ExtendWith(MockitoExtension.class)
class OrderServiceWithOutboxTest {
@Mock
private OrderRepository orderRepository;
@Mock
private OutboxEventRepository outboxEventRepository;
@InjectMocks
private OrderServiceWithOutbox orderService;
@Test
@DisplayName("Should save order and outbox event atomically")
void shouldSaveOrderAndOutboxEvent() {
// Given
CreateOrderCommand command = new CreateOrderCommand("customer-123", new BigDecimal("99.99"));
// When
orderService.createOrder(command);
// Then
verify(orderRepository).save(any(Order.class));
verify(outboxEventRepository).save(any(OutboxEvent.class));
// Both saves happen in the same transaction
}
}
@DataJpaTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class OutboxEventRepositoryTest {
@Autowired
private TestEntityManager entityManager;
@Autowired
private OutboxEventRepository outboxEventRepository;
@Test
@DisplayName("Should find pending events with pessimistic lock")
void shouldFindPendingEventsWithLock() {
// Given
OutboxEvent event1 = createPendingEvent("Order", "order-1", "OrderCreated");
OutboxEvent event2 = createPendingEvent("Order", "order-2", "OrderCreated");
entityManager.persist(event1);
entityManager.persist(event2);
entityManager.flush();
// When
List<OutboxEvent> pendingEvents = outboxEventRepository
.findTop100ByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);
// Then
assertThat(pendingEvents).hasSize(2);
}
private OutboxEvent createPendingEvent(String aggregateType, String aggregateId, String eventType) {
OutboxEvent event = new OutboxEvent();
event.setAggregateType(aggregateType);
event.setAggregateId(aggregateId);
event.setEventType(eventType);
event.setEventData("{}");
event.setCreatedAt(LocalDateTime.now());
event.setStatus(OutboxStatus.PENDING);
return event;
}
}
@SpringBootTest
@Testcontainers
class OutboxIntegrationTest {
@Container
static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:13");
@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0"));
@DynamicPropertySource
static void configureProperties(DynamicPropertyRegistry registry) {
registry.add("spring.datasource.url", postgres::getJdbcUrl);
registry.add("spring.datasource.username", postgres::getUsername);
registry.add("spring.datasource.password", postgres::getPassword);
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}
@Autowired
private OrderDomainService orderService;
@Autowired
private OutboxEventRepository outboxEventRepository;
@Test
@DisplayName("Should ensure exactly-once processing in integration test")
void shouldProcessOutboxEventsReliably() {
// Given
CreateOrderCommand command = new CreateOrderCommand("customer-123", new BigDecimal("150.00"));
// When
String orderId = orderService.createOrder(command);
// Then
List<OutboxEvent> outboxEvents = outboxEventRepository.findByAggregate("Order", orderId);
assertThat(outboxEvents).hasSize(1);
OutboxEvent event = outboxEvents.get(0);
assertThat(event.getStatus()).isEqualTo(OutboxStatus.PENDING);
assertThat(event.getEventType()).isEqualTo("OrderCreated");
}
}

Best Practices and Considerations

1. Idempotent Message Processing

@Component
public class OrderEventConsumer {
private final OrderRepository orderRepository;
@KafkaListener(topics = "domain-events")
public void handleOrderCreated(OrderCreatedEvent event, @Header String messageId) {
// Check if already processed
if (orderRepository.existsById(event.getOrderId())) {
log.info("Order {} already exists, skipping", event.getOrderId());
return;
}
// Process event
Order order = new Order();
order.setId(event.getOrderId());
order.setCustomerId(event.getCustomerId());
order.setAmount(event.getAmount());
order.setStatus(OrderStatus.CREATED);
order.setCreatedAt(event.getCreatedAt());
orderRepository.save(order);
}
}

2. Monitoring and Alerting

@Component
public class OutboxMetrics {
private final MeterRegistry meterRegistry;
private final Counter processedEvents;
private final Counter failedEvents;
private final Gauge pendingEvents;
public OutboxMetrics(MeterRegistry meterRegistry, 
OutboxEventRepository outboxEventRepository) {
this.meterRegistry = meterRegistry;
this.processedEvents = Counter.builder("outbox.events.processed")
.description("Number of successfully processed outbox events")
.register(meterRegistry);
this.failedEvents = Counter.builder("outbox.events.failed")
.description("Number of failed outbox events")
.register(meterRegistry);
this.pendingEvents = Gauge.builder("outbox.events.pending")
.description("Number of pending outbox events")
.register(meterRegistry, this, 
metrics -> outboxEventRepository.countByStatus(OutboxStatus.PENDING));
}
public void incrementProcessed() {
processedEvents.increment();
}
public void incrementFailed() {
failedEvents.increment();
}
}

3. Cleanup Strategy

@Component
public class OutboxCleanupService {
private final OutboxEventRepository outboxEventRepository;
@Scheduled(cron = "0 0 2 * * ?") // Daily at 2 AM
public void cleanupOldEvents() {
LocalDateTime cutoff = LocalDateTime.now().minusDays(30);
int deletedCount = outboxEventRepository
.deleteCompletedEventsOlderThan(cutoff);
log.info("Cleaned up {} completed outbox events older than {}", deletedCount, cutoff);
}
}

Conclusion

The Outbox Pattern provides a robust solution for reliable message delivery in distributed systems:

Key Benefits:

  • Atomicity: Database and message publishing in single transaction
  • Reliability: Guaranteed message delivery
  • Performance: Non-blocking background processing
  • Consistency: Avoids dual-write problems

When to Use:

  • Microservices architectures
  • Event-driven systems
  • Systems requiring reliable messaging
  • Distributed transactions

Considerations:

  • Requires additional database storage
  • Adds complexity with background processors
  • Needs monitoring and cleanup procedures
  • Requires idempotent message consumers

The Outbox Pattern is essential for building resilient, reliable distributed systems that can maintain data consistency across service boundaries.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper