Learn how to implement the Inbox Pattern to achieve idempotency in distributed systems, ensuring reliable message processing and duplicate prevention.
Table of Contents
- Inbox Pattern Overview
- Core Architecture
- Database Schema
- Message Processing
- Idempotency Guarantees
- Error Handling & Retries
- Monitoring & Observability
- Spring Boot Integration
Inbox Pattern Overview
What is the Inbox Pattern?
The Inbox Pattern is a message processing pattern that ensures:
- Idempotent message processing
- Exactly-once semantics
- Ordered processing (when required)
- Dead letter queue for failed messages
Key Components:
- Inbox Table: Stores incoming messages with processing status
- Message Processor: Processes messages from inbox
- Idempotency Check: Prevents duplicate processing
- Retry Mechanism: Handles transient failures
Core Architecture
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Message │───▶│ Inbox │───▶│ Message │ │ Producer │ │ Table │ │ Processor │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ ▼ ▼ ┌─────────────┐ ┌─────────────┐ │ Dead │ │ Business │ │ Letter │ │ Logic │ │ Queue │ │ │ └─────────────┘ └─────────────┘
Database Schema
1. Inbox Table Definition
-- Inbox table for storing incoming messages CREATE TABLE inbox_messages ( id BIGSERIAL PRIMARY KEY, message_id VARCHAR(255) NOT NULL UNIQUE, message_type VARCHAR(100) NOT NULL, payload JSONB NOT NULL, status VARCHAR(50) NOT NULL DEFAULT 'PENDING', -- Idempotency and tracking correlation_id VARCHAR(255), source_system VARCHAR(100), created_at TIMESTAMP NOT NULL DEFAULT NOW(), processed_at TIMESTAMP, -- Retry configuration retry_count INTEGER NOT NULL DEFAULT 0, max_retries INTEGER NOT NULL DEFAULT 3, next_retry_at TIMESTAMP, -- Error handling error_message TEXT, error_stack_trace TEXT, -- Indexes for performance INDEX idx_inbox_status (status), INDEX idx_inbox_message_id (message_id), INDEX idx_inbox_correlation_id (correlation_id), INDEX idx_inbox_created_at (created_at), INDEX idx_inbox_next_retry (next_retry_at) ); -- Dead letter queue for permanently failed messages CREATE TABLE dead_letter_messages ( id BIGSERIAL PRIMARY KEY, message_id VARCHAR(255) NOT NULL UNIQUE, message_type VARCHAR(100) NOT NULL, payload JSONB NOT NULL, original_inbox_id BIGINT, failure_reason TEXT NOT NULL, failure_count INTEGER NOT NULL DEFAULT 1, created_at TIMESTAMP NOT NULL DEFAULT NOW(), archived_at TIMESTAMP, INDEX idx_dlq_message_id (message_id), INDEX idx_dlq_created_at (created_at) ); -- Outbox table (optional, for sending messages) CREATE TABLE outbox_messages ( id BIGSERIAL PRIMARY KEY, message_id VARCHAR(255) NOT NULL UNIQUE, message_type VARCHAR(100) NOT NULL, payload JSONB NOT NULL, destination VARCHAR(255) NOT NULL, status VARCHAR(50) NOT NULL DEFAULT 'PENDING', created_at TIMESTAMP NOT NULL DEFAULT NOW(), sent_at TIMESTAMP, retry_count INTEGER NOT NULL DEFAULT 0, INDEX idx_outbox_status (status), INDEX idx_outbox_created_at (created_at) );
Core Implementation
2. Domain Models
// Message status enum
public enum MessageStatus {
PENDING,
PROCESSING,
PROCESSED,
FAILED,
RETRYING,
DEAD_LETTER
}
// Inbox message entity
public record InboxMessage(
Long id,
String messageId,
String messageType,
String payload,
MessageStatus status,
String correlationId,
String sourceSystem,
Instant createdAt,
Instant processedAt,
Integer retryCount,
Integer maxRetries,
Instant nextRetryAt,
String errorMessage,
String errorStackTrace
) {
public static InboxMessage create(String messageId, String messageType,
String payload, String correlationId) {
return new InboxMessage(
null, messageId, messageType, payload, MessageStatus.PENDING,
correlationId, null, Instant.now(), null, 0, 3, null, null, null
);
}
public InboxMessage markProcessing() {
return new InboxMessage(
id, messageId, messageType, payload, MessageStatus.PROCESSING,
correlationId, sourceSystem, createdAt, null, retryCount,
maxRetries, nextRetryAt, errorMessage, errorStackTrace
);
}
public InboxMessage markProcessed() {
return new InboxMessage(
id, messageId, messageType, payload, MessageStatus.PROCESSED,
correlationId, sourceSystem, createdAt, Instant.now(), retryCount,
maxRetries, null, null, null
);
}
public InboxMessage markFailed(String error, String stackTrace) {
return new InboxMessage(
id, messageId, messageType, payload, MessageStatus.FAILED,
correlationId, sourceSystem, createdAt, null, retryCount,
maxRetries, calculateNextRetry(), error, stackTrace
);
}
public InboxMessage markForRetry() {
return new InboxMessage(
id, messageId, messageType, payload, MessageStatus.RETRYING,
correlationId, sourceSystem, createdAt, null, retryCount + 1,
maxRetries, calculateNextRetry(), errorMessage, errorStackTrace
);
}
public boolean shouldRetry() {
return retryCount < maxRetries && status == MessageStatus.FAILED;
}
public boolean isEligibleForRetry() {
return status == MessageStatus.RETRYING &&
nextRetryAt != null &&
Instant.now().isAfter(nextRetryAt);
}
private Instant calculateNextRetry() {
// Exponential backoff: 5s, 15s, 45s, etc.
int delaySeconds = (int) Math.pow(3, retryCount) * 5;
return Instant.now().plusSeconds(delaySeconds);
}
}
// Dead letter message entity
public record DeadLetterMessage(
Long id,
String messageId,
String messageType,
String payload,
Long originalInboxId,
String failureReason,
Integer failureCount,
Instant createdAt,
Instant archivedAt
) {
public static DeadLetterMessage fromInbox(InboxMessage inbox, String failureReason) {
return new DeadLetterMessage(
null, inbox.messageId(), inbox.messageType(), inbox.payload(),
inbox.id(), failureReason, inbox.retryCount(), Instant.now(), null
);
}
}
// Message processing result
public record MessageProcessingResult(
boolean success,
String messageId,
String errorMessage,
boolean shouldRetry,
Instant processedAt
) {
public static MessageProcessingResult success(String messageId) {
return new MessageProcessingResult(true, messageId, null, false, Instant.now());
}
public static MessageProcessingResult failure(String messageId, String error, boolean shouldRetry) {
return new MessageProcessingResult(false, messageId, error, shouldRetry, Instant.now());
}
}
3. Repository Layer
@Repository
@Transactional
public class InboxRepository {
private static final Logger logger = LoggerFactory.getLogger(InboxRepository.class);
private final DSLContext dsl;
// JOOQ generated tables (assuming code generation)
private static final InboxMessages INBOX = InboxMessages.INBOX_MESSAGES;
private static final DeadLetterMessages DEAD_LETTER = DeadLetterMessages.DEAD_LETTER_MESSAGES;
public InboxRepository(DSLContext dsl) {
this.dsl = dsl;
}
/**
* Store message in inbox with idempotency check
*/
public boolean storeMessage(InboxMessage message) {
try {
int inserted = dsl.insertInto(INBOX)
.set(INBOX.MESSAGE_ID, message.messageId())
.set(INBOX.MESSAGE_TYPE, message.messageType())
.set(INBOX.PAYLOAD, message.payload())
.set(INBOX.STATUS, message.status().name())
.set(INBOX.CORRELATION_ID, message.correlationId())
.set(INBOX.SOURCE_SYSTEM, message.sourceSystem())
.set(INBOX.CREATED_AT, message.createdAt())
.set(INBOX.RETRY_COUNT, message.retryCount())
.set(INBOX.MAX_RETRIES, message.maxRetries())
.onDuplicateKeyIgnore() // Idempotency: ignore if message already exists
.execute();
return inserted > 0;
} catch (DataAccessException e) {
logger.warn("Failed to store message in inbox: {}", message.messageId(), e);
return false;
}
}
/**
* Find pending messages for processing
*/
public List<InboxMessage> findPendingMessages(int limit) {
return dsl.selectFrom(INBOX)
.where(INBOX.STATUS.in(MessageStatus.PENDING.name(), MessageStatus.RETRYING.name()))
.and(INBOX.NEXT_RETRY_AT.isNull().or(INBOX.NEXT_RETRY_AT.le(Instant.now())))
.orderBy(INBOX.CREATED_AT.asc())
.limit(limit)
.fetch(this::mapToInboxMessage);
}
/**
* Find messages eligible for retry
*/
public List<InboxMessage> findMessagesForRetry(int limit) {
return dsl.selectFrom(INBOX)
.where(INBOX.STATUS.eq(MessageStatus.FAILED.name()))
.and(INBOX.RETRY_COUNT.lt(INBOX.MAX_RETRIES))
.and(INBOX.NEXT_RETRY_AT.le(Instant.now()))
.orderBy(INBOX.CREATED_AT.asc())
.limit(limit)
.fetch(this::mapToInboxMessage);
}
/**
* Update message status
*/
public boolean updateMessageStatus(Long id, MessageStatus status) {
return dsl.update(INBOX)
.set(INBOX.STATUS, status.name())
.set(INBOX.PROCESSED_AT, status == MessageStatus.PROCESSED ? Instant.now() : null)
.where(INBOX.ID.eq(id))
.execute() > 0;
}
/**
* Update message with processing result
*/
public boolean updateMessageWithResult(Long id, MessageProcessingResult result) {
if (result.success()) {
return dsl.update(INBOX)
.set(INBOX.STATUS, MessageStatus.PROCESSED.name())
.set(INBOX.PROCESSED_AT, result.processedAt())
.set(INBOX.ERROR_MESSAGE, (String) null)
.set(INBOX.ERROR_STACK_TRACE, (String) null)
.set(INBOX.NEXT_RETRY_AT, (Instant) null)
.where(INBOX.ID.eq(id))
.execute() > 0;
} else {
if (result.shouldRetry()) {
// Mark for retry
Instant nextRetry = calculateNextRetry(
dsl.select(INBOX.RETRY_COUNT)
.from(INBOX)
.where(INBOX.ID.eq(id))
.fetchOne(INBOX.RETRY_COUNT)
);
return dsl.update(INBOX)
.set(INBOX.STATUS, MessageStatus.RETRYING.name())
.set(INBOX.RETRY_COUNT, INBOX.RETRY_COUNT.add(1))
.set(INBOX.NEXT_RETRY_AT, nextRetry)
.set(INBOX.ERROR_MESSAGE, result.errorMessage())
.set(INBOX.ERROR_STACK_TRACE, result.errorMessage()) // Simplified
.where(INBOX.ID.eq(id))
.execute() > 0;
} else {
// Move to dead letter queue
return moveToDeadLetter(id, result.errorMessage());
}
}
}
/**
* Move message to dead letter queue
*/
public boolean moveToDeadLetter(Long inboxId, String failureReason) {
return dsl.transactionResult(configuration -> {
DSLContext txDsl = DSL.using(configuration);
// Get the inbox message
InboxMessage inboxMessage = txDsl.selectFrom(INBOX)
.where(INBOX.ID.eq(inboxId))
.fetchOne(this::mapToInboxMessage);
if (inboxMessage == null) {
return false;
}
// Insert into dead letter queue
int inserted = txDsl.insertInto(DEAD_LETTER)
.set(DEAD_LETTER.MESSAGE_ID, inboxMessage.messageId())
.set(DEAD_LETTER.MESSAGE_TYPE, inboxMessage.messageType())
.set(DEAD_LETTER.PAYLOAD, inboxMessage.payload())
.set(DEAD_LETTER.ORIGINAL_INBOX_ID, inboxMessage.id())
.set(DEAD_LETTER.FAILURE_REASON, failureReason)
.set(DEAD_LETTER.FAILURE_COUNT, inboxMessage.retryCount())
.execute();
// Delete from inbox
if (inserted > 0) {
txDsl.deleteFrom(INBOX)
.where(INBOX.ID.eq(inboxId))
.execute();
}
return inserted > 0;
});
}
/**
* Check if message already exists (idempotency check)
*/
public boolean existsByMessageId(String messageId) {
return dsl.fetchExists(
dsl.selectOne()
.from(INBOX)
.where(INBOX.MESSAGE_ID.eq(messageId))
);
}
/**
* Get processing statistics
*/
public ProcessingStatistics getProcessingStatistics() {
var statusCounts = dsl.select(INBOX.STATUS, count())
.from(INBOX)
.groupBy(INBOX.STATUS)
.fetch()
.stream()
.collect(Collectors.toMap(
r -> r.get(INBOX.STATUS),
r -> r.get(count(), Long.class)
));
var deadLetterCount = dsl.selectCount()
.from(DEAD_LETTER)
.fetchOne(0, Integer.class);
return new ProcessingStatistics(statusCounts, deadLetterCount);
}
// Helper methods
private InboxMessage mapToInboxMessage(Record record) {
return new InboxMessage(
record.get(INBOX.ID),
record.get(INBOX.MESSAGE_ID),
record.get(INBOX.MESSAGE_TYPE),
record.get(INBOX.PAYLOAD),
MessageStatus.valueOf(record.get(INBOX.STATUS)),
record.get(INBOX.CORRELATION_ID),
record.get(INBOX.SOURCE_SYSTEM),
record.get(INBOX.CREATED_AT),
record.get(INBOX.PROCESSED_AT),
record.get(INBOX.RETRY_COUNT),
record.get(INBOX.MAX_RETRIES),
record.get(INBOX.NEXT_RETRY_AT),
record.get(INBOX.ERROR_MESSAGE),
record.get(INBOX.ERROR_STACK_TRACE)
);
}
private Instant calculateNextRetry(Integer currentRetryCount) {
int delaySeconds = (int) Math.pow(3, currentRetryCount) * 5;
return Instant.now().plusSeconds(delaySeconds);
}
public record ProcessingStatistics(
Map<String, Long> statusCounts,
Integer deadLetterCount
) {}
}
Message Processing
4. Message Processor Service
@Service
@Transactional
public class InboxMessageProcessor {
private static final Logger logger = LoggerFactory.getLogger(InboxMessageProcessor.class);
private final InboxRepository inboxRepository;
private final Map<String, MessageHandler> messageHandlers;
private final ObjectMapper objectMapper;
public InboxMessageProcessor(InboxRepository inboxRepository,
List<MessageHandler> handlers,
ObjectMapper objectMapper) {
this.inboxRepository = inboxRepository;
this.objectMapper = objectMapper;
this.messageHandlers = handlers.stream()
.collect(Collectors.toMap(
MessageHandler::getMessageType,
Function.identity()
));
}
/**
* Process a single message
*/
public MessageProcessingResult processMessage(InboxMessage message) {
logger.info("Processing message: {} of type {}", message.messageId(), message.messageType());
try {
// Get appropriate handler
MessageHandler handler = messageHandlers.get(message.messageType());
if (handler == null) {
return MessageProcessingResult.failure(
message.messageId(),
"No handler found for message type: " + message.messageType(),
false
);
}
// Process the message
handler.handleMessage(message.payload());
logger.info("Successfully processed message: {}", message.messageId());
return MessageProcessingResult.success(message.messageId());
} catch (TransientException e) {
// Transient failures should be retried
logger.warn("Transient failure processing message: {}", message.messageId(), e);
return MessageProcessingResult.failure(
message.messageId(),
e.getMessage(),
true
);
} catch (BusinessException e) {
// Business logic failures should not be retried
logger.error("Business failure processing message: {}", message.messageId(), e);
return MessageProcessingResult.failure(
message.messageId(),
e.getMessage(),
false
);
} catch (Exception e) {
// Unexpected errors - retry based on configuration
logger.error("Unexpected error processing message: {}", message.messageId(), e);
return MessageProcessingResult.failure(
message.messageId(),
"Unexpected error: " + e.getMessage(),
true // Retry unexpected errors
);
}
}
/**
* Process batch of messages
*/
public BatchProcessingResult processMessageBatch(List<InboxMessage> messages) {
List<MessageProcessingResult> results = new ArrayList<>();
List<InboxMessage> processedMessages = new ArrayList<>();
for (InboxMessage message : messages) {
// Mark as processing
inboxRepository.updateMessageStatus(message.id(), MessageStatus.PROCESSING);
// Process message
MessageProcessingResult result = processMessage(message);
results.add(result);
// Update message with result
inboxRepository.updateMessageWithResult(message.id(), result);
processedMessages.add(message);
}
return new BatchProcessingResult(processedMessages, results);
}
/**
* Process pending messages (scheduled task)
*/
@Scheduled(fixedDelayString = "${inbox.processing.interval:5000}")
@Async("inboxProcessorExecutor")
public void processPendingMessages() {
try {
// Get batch of pending messages
List<InboxMessage> pendingMessages = inboxRepository.findPendingMessages(100);
if (!pendingMessages.isEmpty()) {
logger.info("Processing {} pending messages", pendingMessages.size());
BatchProcessingResult result = processMessageBatch(pendingMessages);
// Log processing statistics
logProcessingStatistics(result);
}
} catch (Exception e) {
logger.error("Error processing pending messages", e);
}
}
/**
* Retry failed messages (scheduled task)
*/
@Scheduled(fixedDelayString = "${inbox.retry.interval:30000}")
@Async("inboxProcessorExecutor")
public void retryFailedMessages() {
try {
List<InboxMessage> retryMessages = inboxRepository.findMessagesForRetry(50);
if (!retryMessages.isEmpty()) {
logger.info("Retrying {} failed messages", retryMessages.size());
BatchProcessingResult result = processMessageBatch(retryMessages);
// Log retry statistics
logRetryStatistics(result);
}
} catch (Exception e) {
logger.error("Error retrying failed messages", e);
}
}
private void logProcessingStatistics(BatchProcessingResult result) {
long successCount = result.results().stream()
.filter(MessageProcessingResult::success)
.count();
long failureCount = result.results().size() - successCount;
logger.info("Message processing batch completed: {} success, {} failure",
successCount, failureCount);
}
private void logRetryStatistics(BatchProcessingResult result) {
long successCount = result.results().stream()
.filter(MessageProcessingResult::success)
.count();
logger.info("Message retry batch completed: {} successfully retried", successCount);
}
public record BatchProcessingResult(
List<InboxMessage> messages,
List<MessageProcessingResult> results
) {}
}
// Message handler interface
public interface MessageHandler {
String getMessageType();
void handleMessage(String payload) throws Exception;
}
// Example message handlers
@Service
public class OrderCreatedHandler implements MessageHandler {
private static final Logger logger = LoggerFactory.getLogger(OrderCreatedHandler.class);
private final ObjectMapper objectMapper;
private final OrderService orderService;
public OrderCreatedHandler(ObjectMapper objectMapper, OrderService orderService) {
this.objectMapper = objectMapper;
this.orderService = orderService;
}
@Override
public String getMessageType() {
return "ORDER_CREATED";
}
@Override
public void handleMessage(String payload) throws Exception {
OrderCreatedEvent event = objectMapper.readValue(payload, OrderCreatedEvent.class);
logger.info("Processing order created event: {}", event.orderId());
// Business logic
orderService.processNewOrder(event.orderId(), event.customerId(), event.items());
// Simulate potential transient failure
if (Math.random() < 0.1) { // 10% chance of transient failure
throw new TransientException("Temporary database connection issue");
}
}
public record OrderCreatedEvent(
String orderId,
String customerId,
List<OrderItem> items,
BigDecimal totalAmount
) {}
public record OrderItem(String productId, Integer quantity, BigDecimal price) {}
}
@Service
public class PaymentProcessedHandler implements MessageHandler {
private static final Logger logger = LoggerFactory.getLogger(PaymentProcessedHandler.class);
private final ObjectMapper objectMapper;
private final OrderService orderService;
public PaymentProcessedHandler(ObjectMapper objectMapper, OrderService orderService) {
this.objectMapper = objectMapper;
this.orderService = orderService;
}
@Override
public String getMessageType() {
return "PAYMENT_PROCESSED";
}
@Override
public void handleMessage(String payload) throws Exception {
PaymentProcessedEvent event = objectMapper.readValue(payload, PaymentProcessedEvent.class);
logger.info("Processing payment processed event: {}", event.paymentId());
// Update order status
orderService.updateOrderStatus(event.orderId(), "PAID");
// If payment failed, don't retry (business logic failure)
if (!event.success()) {
throw new BusinessException("Payment failed: " + event.failureReason());
}
}
public record PaymentProcessedEvent(
String paymentId,
String orderId,
boolean success,
String failureReason
) {}
}
// Custom exceptions
public class TransientException extends RuntimeException {
public TransientException(String message) {
super(message);
}
public TransientException(String message, Throwable cause) {
super(message, cause);
}
}
public class BusinessException extends RuntimeException {
public BusinessException(String message) {
super(message);
}
public BusinessException(String message, Throwable cause) {
super(message, cause);
}
}
Idempotency Guarantees
5. Idempotent Message Receiver
@Service
public class IdempotentMessageReceiver {
private static final Logger logger = LoggerFactory.getLogger(IdempotentMessageReceiver.class);
private final InboxRepository inboxRepository;
private final ObjectMapper objectMapper;
public IdempotentMessageReceiver(InboxRepository inboxRepository, ObjectMapper objectMapper) {
this.inboxRepository = inboxRepository;
this.objectMapper = objectMapper;
}
/**
* Receive message with idempotency guarantee
*/
public MessageReceipt receiveMessage(ReceivedMessage message) {
// Check for duplicate message
if (inboxRepository.existsByMessageId(message.messageId())) {
logger.info("Duplicate message detected, ignoring: {}", message.messageId());
return MessageReceipt.duplicate(message.messageId());
}
// Store in inbox
InboxMessage inboxMessage = InboxMessage.create(
message.messageId(),
message.messageType(),
message.payload(),
message.correlationId()
);
boolean stored = inboxRepository.storeMessage(inboxMessage);
if (stored) {
logger.info("Message stored in inbox: {}", message.messageId());
return MessageReceipt.accepted(message.messageId());
} else {
logger.error("Failed to store message in inbox: {}", message.messageId());
return MessageReceipt.rejected(message.messageId(), "Storage failed");
}
}
/**
* Receive message from external system (e.g., Kafka, RabbitMQ, HTTP)
*/
@KafkaListener(topics = "${app.kafka.topics.inbox}")
public void receiveKafkaMessage(ConsumerRecord<String, String> record) {
try {
ReceivedMessage message = ReceivedMessage.fromKafkaRecord(record);
MessageReceipt receipt = receiveMessage(message);
if (!receipt.accepted()) {
logger.warn("Message rejected: {}", receipt.messageId());
// In real implementation, you might want to DLQ or retry
}
} catch (Exception e) {
logger.error("Error processing Kafka message", e);
}
}
@PostMapping("/api/messages")
public ResponseEntity<MessageReceipt> receiveHttpMessage(@RequestBody ReceivedMessage message) {
try {
MessageReceipt receipt = receiveMessage(message);
if (receipt.accepted()) {
return ResponseEntity.accepted().body(receipt);
} else if (receipt.duplicate()) {
return ResponseEntity.status(HttpStatus.CONFLICT).body(receipt);
} else {
return ResponseEntity.badRequest().body(receipt);
}
} catch (Exception e) {
logger.error("Error receiving HTTP message", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(MessageReceipt.rejected(message.messageId(), "Internal server error"));
}
}
// Message DTOs
public record ReceivedMessage(
String messageId,
String messageType,
String payload,
String correlationId,
String sourceSystem,
Map<String, String> headers
) {
public static ReceivedMessage fromKafkaRecord(ConsumerRecord<String, String> record) {
Map<String, String> headers = new HashMap<>();
record.headers().forEach(header -> {
headers.put(header.key(), new String(header.value()));
});
return new ReceivedMessage(
record.key(),
headers.get("messageType"),
record.value(),
headers.get("correlationId"),
"KAFKA",
headers
);
}
}
public record MessageReceipt(
String messageId,
boolean accepted,
boolean duplicate,
String reason
) {
public static MessageReceipt accepted(String messageId) {
return new MessageReceipt(messageId, true, false, "Accepted");
}
public static MessageReceipt duplicate(String messageId) {
return new MessageReceipt(messageId, false, true, "Duplicate message");
}
public static MessageReceipt rejected(String messageId, String reason) {
return new MessageReceipt(messageId, false, false, reason);
}
}
}
Error Handling & Retries
6. Advanced Error Handling
@Service
public class DeadLetterService {
private static final Logger logger = LoggerFactory.getLogger(DeadLetterService.class);
private final InboxRepository inboxRepository;
private final ObjectMapper objectMapper;
public DeadLetterService(InboxRepository inboxRepository, ObjectMapper objectMapper) {
this.inboxRepository = inboxRepository;
this.objectMapper = objectMapper;
}
/**
* Reprocess message from dead letter queue
*/
@Transactional
public boolean reprocessDeadLetterMessage(Long deadLetterId) {
// Implementation to move message from DLQ back to inbox
// This would involve:
// 1. Fetch message from dead_letter_messages
// 2. Create new inbox message with new message ID
// 3. Delete from dead_letter_messages
// 4. Store in inbox_messages
logger.info("Reprocessing dead letter message: {}", deadLetterId);
// Implementation details would depend on your schema
return true;
}
/**
* Archive old dead letter messages
*/
@Scheduled(cron = "0 0 2 * * ?") // Daily at 2 AM
public void archiveOldDeadLetters() {
try {
// Archive messages older than 30 days
// Implementation depends on your archival strategy
logger.info("Archiving old dead letter messages");
} catch (Exception e) {
logger.error("Error archiving dead letter messages", e);
}
}
}
// Circuit breaker for message processing
@Service
public class CircuitBreakerMessageProcessor {
private final CircuitBreaker circuitBreaker;
private final InboxMessageProcessor messageProcessor;
public CircuitBreakerMessageProcessor(InboxMessageProcessor messageProcessor) {
this.messageProcessor = messageProcessor;
this.circuitBreaker = CircuitBreaker.ofDefaults("inboxProcessor");
}
public MessageProcessingResult processMessageWithCircuitBreaker(InboxMessage message) {
return circuitBreaker.executeSupplier(() ->
messageProcessor.processMessage(message)
);
}
}
// Retry configuration with exponential backoff
@Configuration
public class RetryConfig {
@Bean
public RetryRegistry retryRegistry() {
return RetryRegistry.of(Map.of(
"inboxProcessing", RetryConfig.custom()
.maxAttempts(3)
.waitDuration(Duration.ofSeconds(5))
.exponentialBackoff(2.0, Duration.ofSeconds(1), Duration.ofSeconds(30))
.retryExceptions(TransientException.class)
.build()
));
}
@Bean
public Retry inboxRetry(RetryRegistry retryRegistry) {
return retryRegistry.retry("inboxProcessing");
}
}
Monitoring & Observability
7. Metrics and Monitoring
@Service
public class InboxMetricsService {
private final MeterRegistry meterRegistry;
private final InboxRepository inboxRepository;
// Metrics
private final Counter messagesReceived;
private final Counter messagesProcessed;
private final Counter messagesFailed;
private final Timer messageProcessingTimer;
private final Gauge pendingMessagesGauge;
public InboxMetricsService(MeterRegistry meterRegistry, InboxRepository inboxRepository) {
this.meterRegistry = meterRegistry;
this.inboxRepository = inboxRepository;
// Initialize metrics
this.messagesReceived = Counter.builder("inbox.messages.received")
.description("Total messages received")
.register(meterRegistry);
this.messagesProcessed = Counter.builder("inbox.messages.processed")
.description("Total messages processed successfully")
.tag("result", "success")
.register(meterRegistry);
this.messagesFailed = Counter.builder("inbox.messages.processed")
.description("Total messages failed")
.tag("result", "failure")
.register(meterRegistry);
this.messageProcessingTimer = Timer.builder("inbox.message.processing.time")
.description("Message processing time")
.register(meterRegistry);
this.pendingMessagesGauge = Gauge.builder("inbox.messages.pending")
.description("Number of pending messages")
.register(meterRegistry, this, InboxMetricsService::getPendingMessageCount);
}
public void recordMessageReceived() {
messagesReceived.increment();
}
public void recordMessageProcessed(boolean success, Duration processingTime) {
if (success) {
messagesProcessed.increment();
} else {
messagesFailed.increment();
}
messageProcessingTimer.record(processingTime);
}
private double getPendingMessageCount() {
try {
InboxRepository.ProcessingStatistics stats = inboxRepository.getProcessingStatistics();
return stats.statusCounts().getOrDefault("PENDING", 0L) +
stats.statusCounts().getOrDefault("RETRYING", 0L);
} catch (Exception e) {
return 0.0;
}
}
}
// Health check
@Component
public class InboxHealthIndicator implements HealthIndicator {
private final InboxRepository inboxRepository;
public InboxHealthIndicator(InboxRepository inboxRepository) {
this.inboxRepository = inboxRepository;
}
@Override
public Health health() {
try {
InboxRepository.ProcessingStatistics stats = inboxRepository.getProcessingStatistics();
long deadLetterCount = stats.deadLetterCount();
long pendingCount = stats.statusCounts().getOrDefault("PENDING", 0L);
long failedCount = stats.statusCounts().getOrDefault("FAILED", 0L);
Health.Builder status = Health.up();
// Add details
status.withDetail("pendingMessages", pendingCount)
.withDetail("failedMessages", failedCount)
.withDetail("deadLetterMessages", deadLetterCount);
// Warn if too many dead letters
if (deadLetterCount > 100) {
status = Health.down()
.withDetail("reason", "Too many dead letter messages: " + deadLetterCount);
}
return status.build();
} catch (Exception e) {
return Health.down(e).build();
}
}
}
Spring Boot Integration
8. Complete Spring Boot Configuration
# application.yml spring: datasource: url: jdbc:postgresql://localhost:5432/inbox_pattern username: inbox_user password: inbox_pass hikari: maximum-pool-size: 20 minimum-idle: 5 kafka: bootstrap-servers: localhost:9092 consumer: group-id: inbox-consumer auto-offset-reset: earliest enable-auto-commit: false jackson: property-naming-strategy: SNAKE_CASE # Inbox configuration inbox: processing: enabled: true interval: 5000 batch-size: 100 retry: enabled: true interval: 30000 max-retries: 3 dead-letter: archive-after-days: 30 # Async configuration async: executor: inbox: core-pool-size: 5 max-pool-size: 10 queue-capacity: 1000 thread-name-prefix: "InboxProcessor-" # Management endpoints management: endpoints: web: exposure: include: health,metrics,info endpoint: health: show-details: always
9. Spring Configuration Classes
@Configuration
@EnableAsync
@EnableScheduling
@EnableConfigurationProperties(InboxProperties.class)
public class InboxConfiguration {
@Bean
@ConfigurationProperties(prefix = "async.executor.inbox")
public TaskExecutorBuilder inboxTaskExecutorBuilder() {
return new TaskExecutorBuilder();
}
@Bean("inboxProcessorExecutor")
public TaskExecutor inboxProcessorExecutor(TaskExecutorBuilder builder) {
return builder.build();
}
@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper()
.registerModule(new JavaTimeModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
}
}
@ConfigurationProperties(prefix = "inbox")
@Data
public class InboxProperties {
private Processing processing = new Processing();
private Retry retry = new Retry();
private DeadLetter deadLetter = new DeadLetter();
@Data
public static class Processing {
private boolean enabled = true;
private long interval = 5000;
private int batchSize = 100;
}
@Data
public static class Retry {
private boolean enabled = true;
private long interval = 30000;
private int maxRetries = 3;
}
@Data
public static class DeadLetter {
private int archiveAfterDays = 30;
}
}
// Main application
@SpringBootApplication
@EnableKafka
public class InboxPatternApplication {
public static void main(String[] args) {
SpringApplication.run(InboxPatternApplication.class, args);
}
@Bean
public CommandLineRunner demo(InboxRepository repository) {
return args -> {
// Demo: Insert test messages
if (args.length > 0 && "demo".equals(args[0])) {
insertDemoMessages(repository);
}
};
}
private void insertDemoMessages(InboxRepository repository) {
for (int i = 0; i < 10; i++) {
InboxMessage message = InboxMessage.create(
"msg-" + UUID.randomUUID(),
"ORDER_CREATED",
"{\"orderId\": \"order-" + i + "\", \"amount\": 100.00}",
"corr-" + i
);
repository.storeMessage(message);
}
}
}
This comprehensive implementation of the Inbox Pattern provides:
- Strong idempotency guarantees through message ID tracking
- Reliable message processing with retry mechanisms
- Dead letter queue for handling permanent failures
- Monitoring and observability through metrics and health checks
- Spring Boot integration for easy adoption
- Scalable architecture suitable for high-throughput systems
The pattern ensures that your system can handle duplicate messages, transient failures, and maintain data consistency across distributed components.