Orchestrating Reliable Business Processes Across Microservices
Article
The Saga Pattern is a design pattern for managing distributed transactions across multiple microservices. Unlike traditional ACID transactions, Sagas break transactions into a sequence of local transactions, each with a compensating action for rollback. This approach maintains data consistency without long-lived locks in distributed systems.
Saga Pattern Architecture Overview
Key Concepts:
- Saga: A sequence of local transactions
- Compensating Actions: Operations to undo previous steps
- Orchestration: Central coordinator managing the saga flow
- Choreography: Distributed coordination through events
- Saga Log: Persistent record of saga execution
Saga Types:
- Orchestration-Based: Central coordinator controls the flow
- Choreography-Based: Services communicate via events
1. Core Saga Framework
Let's start with the foundational saga framework:
package com.saga.core;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
// Saga execution status
public enum SagaStatus {
STARTED,
EXECUTING,
COMPENSATING,
COMPLETED,
FAILED,
COMPENSATED
}
// Saga step result
class StepResult {
private final boolean success;
private final String errorMessage;
private final Object result;
public StepResult(boolean success, String errorMessage, Object result) {
this.success = success;
this.errorMessage = errorMessage;
this.result = result;
}
public static StepResult success(Object result) {
return new StepResult(true, null, result);
}
public static StepResult failure(String errorMessage) {
return new StepResult(false, errorMessage, null);
}
// Getters
public boolean isSuccess() { return success; }
public String getErrorMessage() { return errorMessage; }
public Object getResult() { return result; }
}
// Saga step definition
public abstract class SagaStep<T> {
private final String name;
private final Map<String, Object> context;
public SagaStep(String name) {
this.name = name;
this.context = new ConcurrentHashMap<>();
}
public abstract StepResult execute(T sagaData);
public abstract StepResult compensate(T sagaData);
public String getName() { return name; }
public Map<String, Object> getContext() { return context; }
public void setContext(String key, Object value) {
context.put(key, value);
}
public Object getContext(String key) {
return context.get(key);
}
}
// Saga data interface
public interface SagaData {
String getSagaId();
void setSagaId(String sagaId);
LocalDateTime getCreatedAt();
void setCreatedAt(LocalDateTime createdAt);
}
// Saga execution context
public class SagaExecutionContext {
private final String sagaId;
private final List<SagaStep<?>> steps;
private final Map<String, Object> sharedData;
private int currentStepIndex;
private SagaStatus status;
private String failureReason;
private final LocalDateTime startedAt;
private LocalDateTime completedAt;
public SagaExecutionContext(String sagaId) {
this.sagaId = sagaId;
this.steps = new ArrayList<>();
this.sharedData = new ConcurrentHashMap<>();
this.currentStepIndex = 0;
this.status = SagaStatus.STARTED;
this.startedAt = LocalDateTime.now();
}
public void addStep(SagaStep<?> step) {
steps.add(step);
}
public SagaStep<?> getCurrentStep() {
if (currentStepIndex < steps.size()) {
return steps.get(currentStepIndex);
}
return null;
}
public void moveToNextStep() {
currentStepIndex++;
}
public boolean hasNextStep() {
return currentStepIndex < steps.size();
}
public void setSharedData(String key, Object value) {
sharedData.put(key, value);
}
public Object getSharedData(String key) {
return sharedData.get(key);
}
// Getters and setters
public String getSagaId() { return sagaId; }
public List<SagaStep<?>> getSteps() { return steps; }
public int getCurrentStepIndex() { return currentStepIndex; }
public SagaStatus getStatus() { return status; }
public void setStatus(SagaStatus status) { this.status = status; }
public String getFailureReason() { return failureReason; }
public void setFailureReason(String failureReason) { this.failureReason = failureReason; }
public LocalDateTime getStartedAt() { return startedAt; }
public LocalDateTime getCompletedAt() { return completedAt; }
public void setCompletedAt(LocalDateTime completedAt) { this.completedAt = completedAt; }
}
2. Saga Orchestrator
The central coordinator that manages saga execution:
package com.saga.orchestrator;
import com.saga.core.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class SagaOrchestrator {
private static final Logger logger = LoggerFactory.getLogger(SagaOrchestrator.class);
private final Map<String, SagaExecutionContext> activeSagas;
private final SagaRepository sagaRepository;
private final SagaStepFactory stepFactory;
public SagaOrchestrator(SagaRepository sagaRepository, SagaStepFactory stepFactory) {
this.activeSagas = new ConcurrentHashMap<>();
this.sagaRepository = sagaRepository;
this.stepFactory = stepFactory;
}
public <T extends SagaData> String startSaga(String sagaType, T sagaData) {
String sagaId = generateSagaId();
sagaData.setSagaId(sagaId);
sagaData.setCreatedAt(LocalDateTime.now());
SagaExecutionContext context = new SagaExecutionContext(sagaId);
// Build saga steps based on type
List<SagaStep<T>> steps = stepFactory.createSteps(sagaType, sagaData);
steps.forEach(context::addStep);
activeSagas.put(sagaId, context);
sagaRepository.saveSagaContext(context);
logger.info("Started saga: {} with ID: {}", sagaType, sagaId);
// Execute saga asynchronously
executeSagaAsync(sagaId, sagaData);
return sagaId;
}
public <T extends SagaData> void executeSagaAsync(String sagaId, T sagaData) {
new Thread(() -> executeSaga(sagaId, sagaData)).start();
}
public <T extends SagaData> SagaResult executeSaga(String sagaId, T sagaData) {
SagaExecutionContext context = activeSagas.get(sagaId);
if (context == null) {
context = sagaRepository.findSagaContext(sagaId)
.orElseThrow(() -> new SagaException("Saga not found: " + sagaId));
activeSagas.put(sagaId, context);
}
try {
context.setStatus(SagaStatus.EXECUTING);
sagaRepository.saveSagaContext(context);
while (context.hasNextStep()) {
@SuppressWarnings("unchecked")
SagaStep<T> currentStep = (SagaStep<T>) context.getCurrentStep();
logger.info("Executing step: {} for saga: {}", currentStep.getName(), sagaId);
StepResult result = currentStep.execute(sagaData);
if (result.isSuccess()) {
context.moveToNextStep();
sagaRepository.saveSagaContext(context);
logger.info("Step completed successfully: {}", currentStep.getName());
} else {
// Step failed, start compensation
logger.error("Step failed: {}. Error: {}", currentStep.getName(), result.getErrorMessage());
compensateSaga(sagaId, sagaData, context);
return SagaResult.failure(result.getErrorMessage());
}
}
// All steps completed successfully
context.setStatus(SagaStatus.COMPLETED);
context.setCompletedAt(LocalDateTime.now());
sagaRepository.saveSagaContext(context);
activeSagas.remove(sagaId);
logger.info("Saga completed successfully: {}", sagaId);
return SagaResult.success();
} catch (Exception e) {
logger.error("Error executing saga: {}", sagaId, e);
compensateSaga(sagaId, sagaData, context);
return SagaResult.failure(e.getMessage());
}
}
private <T extends SagaData> void compensateSaga(String sagaId, T sagaData, SagaExecutionContext context) {
try {
context.setStatus(SagaStatus.COMPENSATING);
context.setFailureReason("Execution failed at step: " + context.getCurrentStepIndex());
sagaRepository.saveSagaContext(context);
// Compensate from current step backwards
for (int i = context.getCurrentStepIndex() - 1; i >= 0; i--) {
@SuppressWarnings("unchecked")
SagaStep<T> stepToCompensate = (SagaStep<T>) context.getSteps().get(i);
logger.info("Compensating step: {} for saga: {}", stepToCompensate.getName(), sagaId);
StepResult compensationResult = stepToCompensate.compensate(sagaData);
if (!compensationResult.isSuccess()) {
logger.error("Compensation failed for step: {}. Error: {}",
stepToCompensate.getName(), compensationResult.getErrorMessage());
// Continue compensation despite failures
}
}
context.setStatus(SagaStatus.COMPENSATED);
context.setCompletedAt(LocalDateTime.now());
sagaRepository.saveSagaContext(context);
activeSagas.remove(sagaId);
logger.info("Saga compensation completed: {}", sagaId);
} catch (Exception e) {
logger.error("Error during saga compensation: {}", sagaId, e);
context.setStatus(SagaStatus.FAILED);
sagaRepository.saveSagaContext(context);
}
}
public SagaStatus getSagaStatus(String sagaId) {
SagaExecutionContext context = activeSagas.get(sagaId);
if (context != null) {
return context.getStatus();
}
return sagaRepository.findSagaContext(sagaId)
.map(SagaExecutionContext::getStatus)
.orElse(null);
}
private String generateSagaId() {
return "SAGA-" + UUID.randomUUID().toString().substring(0, 8).toUpperCase();
}
// Supporting classes
public static class SagaResult {
private final boolean success;
private final String errorMessage;
private final String sagaId;
private SagaResult(boolean success, String errorMessage, String sagaId) {
this.success = success;
this.errorMessage = errorMessage;
this.sagaId = sagaId;
}
public static SagaResult success() {
return new SagaResult(true, null, null);
}
public static SagaResult success(String sagaId) {
return new SagaResult(true, null, sagaId);
}
public static SagaResult failure(String errorMessage) {
return new SagaResult(false, errorMessage, null);
}
// Getters
public boolean isSuccess() { return success; }
public String getErrorMessage() { return errorMessage; }
public String getSagaId() { return sagaId; }
}
public static class SagaException extends RuntimeException {
public SagaException(String message) {
super(message);
}
public SagaException(String message, Throwable cause) {
super(message, cause);
}
}
}
3. Saga Repository for Persistence
package com.saga.repository;
import com.saga.core.SagaExecutionContext;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Repository;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@Repository
public class SagaRepository {
private final RedisTemplate<String, SagaExecutionContext> redisTemplate;
private static final String SAGA_KEY_PREFIX = "saga:";
private static final long SAGA_TTL_HOURS = 24;
public SagaRepository(RedisTemplate<String, SagaExecutionContext> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public void saveSagaContext(SagaExecutionContext context) {
String key = SAGA_KEY_PREFIX + context.getSagaId();
redisTemplate.opsForValue().set(key, context, SAGA_TTL_HOURS, TimeUnit.HOURS);
}
public Optional<SagaExecutionContext> findSagaContext(String sagaId) {
String key = SAGA_KEY_PREFIX + sagaId;
SagaExecutionContext context = redisTemplate.opsForValue().get(key);
return Optional.ofNullable(context);
}
public void deleteSagaContext(String sagaId) {
String key = SAGA_KEY_PREFIX + sagaId;
redisTemplate.delete(key);
}
public boolean exists(String sagaId) {
String key = SAGA_KEY_PREFIX + sagaId;
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}
}
// Alternative: JPA Repository
@Entity
@Table(name = "saga_executions")
class SagaExecutionEntity {
@Id
private String sagaId;
private String sagaType;
private String status;
private String currentStep;
private int currentStepIndex;
private String failureReason;
@Lob
private String sagaData; // JSON serialized saga data
@Lob
private String executionContext; // JSON serialized context
private LocalDateTime startedAt;
private LocalDateTime updatedAt;
private LocalDateTime completedAt;
// Constructors, getters, setters
}
@Repository
public interface SagaJpaRepository extends JpaRepository<SagaExecutionEntity, String> {
List<SagaExecutionEntity> findByStatusAndUpdatedAtBefore(String status, LocalDateTime cutoff);
Optional<SagaExecutionEntity> findBySagaId(String sagaId);
}
4. Concrete Saga Implementation: E-commerce Order Processing
Let's implement a real-world e-commerce order processing saga:
package com.saga.sagas.order;
import com.saga.core.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.*;
// Order saga data
public class OrderSagaData implements SagaData {
private String sagaId;
private String orderId;
private String customerId;
private List<OrderItem> items;
private BigDecimal totalAmount;
private String shippingAddress;
private String paymentMethod;
private LocalDateTime createdAt;
// Constructors
public OrderSagaData() {}
public OrderSagaData(String orderId, String customerId, List<OrderItem> items,
BigDecimal totalAmount, String shippingAddress, String paymentMethod) {
this.orderId = orderId;
this.customerId = customerId;
this.items = items;
this.totalAmount = totalAmount;
this.shippingAddress = shippingAddress;
this.paymentMethod = paymentMethod;
this.createdAt = LocalDateTime.now();
}
// Getters and setters
@Override public String getSagaId() { return sagaId; }
@Override public void setSagaId(String sagaId) { this.sagaId = sagaId; }
public String getOrderId() { return orderId; }
public void setOrderId(String orderId) { this.orderId = orderId; }
public String getCustomerId() { return customerId; }
public void setCustomerId(String customerId) { this.customerId = customerId; }
public List<OrderItem> getItems() { return items; }
public void setItems(List<OrderItem> items) { this.items = items; }
public BigDecimal getTotalAmount() { return totalAmount; }
public void setTotalAmount(BigDecimal totalAmount) { this.totalAmount = totalAmount; }
public String getShippingAddress() { return shippingAddress; }
public void setShippingAddress(String shippingAddress) { this.shippingAddress = shippingAddress; }
public String getPaymentMethod() { return paymentMethod; }
public void setPaymentMethod(String paymentMethod) { this.paymentMethod = paymentMethod; }
@Override public LocalDateTime getCreatedAt() { return createdAt; }
@Override public void setCreatedAt(LocalDateTime createdAt) { this.createdAt = createdAt; }
}
class OrderItem {
private String productId;
private String productName;
private int quantity;
private BigDecimal unitPrice;
// Constructors, getters, setters
public OrderItem(String productId, String productName, int quantity, BigDecimal unitPrice) {
this.productId = productId;
this.productName = productName;
this.quantity = quantity;
this.unitPrice = unitPrice;
}
public String getProductId() { return productId; }
public String getProductName() { return productName; }
public int getQuantity() { return quantity; }
public BigDecimal getUnitPrice() { return unitPrice; }
public BigDecimal getTotalPrice() { return unitPrice.multiply(BigDecimal.valueOf(quantity)); }
}
// Order Validation Step
@Component
class OrderValidationStep extends SagaStep<OrderSagaData> {
private static final Logger logger = LoggerFactory.getLogger(OrderValidationStep.class);
private final OrderService orderService;
public OrderValidationStep(OrderService orderService) {
super("ORDER_VALIDATION");
this.orderService = orderService;
}
@Override
public StepResult execute(OrderSagaData sagaData) {
try {
logger.info("Validating order: {}", sagaData.getOrderId());
// Validate order data
if (sagaData.getItems() == null || sagaData.getItems().isEmpty()) {
return StepResult.failure("Order must contain at least one item");
}
if (sagaData.getTotalAmount() == null || sagaData.getTotalAmount().compareTo(BigDecimal.ZERO) <= 0) {
return StepResult.failure("Invalid order total amount");
}
// Validate customer
if (!orderService.isCustomerValid(sagaData.getCustomerId())) {
return StepResult.failure("Invalid customer: " + sagaData.getCustomerId());
}
// Store validation context
setContext("validatedAt", LocalDateTime.now());
logger.info("Order validation successful: {}", sagaData.getOrderId());
return StepResult.success("Order validated successfully");
} catch (Exception e) {
logger.error("Order validation failed: {}", sagaData.getOrderId(), e);
return StepResult.failure("Order validation failed: " + e.getMessage());
}
}
@Override
public StepResult compensate(OrderSagaData sagaData) {
// Validation step has no side effects, so compensation is a no-op
logger.info("Compensating order validation step for order: {}", sagaData.getOrderId());
return StepResult.success("Validation compensation completed");
}
}
// Inventory Reservation Step
@Component
class InventoryReservationStep extends SagaStep<OrderSagaData> {
private static final Logger logger = LoggerFactory.getLogger(InventoryReservationStep.class);
private final InventoryService inventoryService;
public InventoryReservationStep(InventoryService inventoryService) {
super("INVENTORY_RESERVATION");
this.inventoryService = inventoryService;
}
@Override
public StepResult execute(OrderSagaData sagaData) {
try {
logger.info("Reserving inventory for order: {}", sagaData.getOrderId());
Map<String, Integer> productQuantities = new HashMap<>();
for (OrderItem item : sagaData.getItems()) {
productQuantities.put(item.getProductId(), item.getQuantity());
}
// Reserve inventory
String reservationId = inventoryService.reserveInventory(
sagaData.getOrderId(),
productQuantities
);
if (reservationId == null) {
return StepResult.failure("Inventory reservation failed for order: " + sagaData.getOrderId());
}
// Store reservation ID for compensation
setContext("reservationId", reservationId);
logger.info("Inventory reserved successfully. Reservation ID: {}", reservationId);
return StepResult.success(reservationId);
} catch (Exception e) {
logger.error("Inventory reservation failed for order: {}", sagaData.getOrderId(), e);
return StepResult.failure("Inventory reservation failed: " + e.getMessage());
}
}
@Override
public StepResult compensate(OrderSagaData sagaData) {
try {
String reservationId = (String) getContext("reservationId");
if (reservationId != null) {
logger.info("Releasing inventory reservation: {}", reservationId);
inventoryService.releaseInventory(reservationId);
logger.info("Inventory reservation released: {}", reservationId);
}
return StepResult.success("Inventory compensation completed");
} catch (Exception e) {
logger.error("Inventory compensation failed for reservation: {}", getContext("reservationId"), e);
return StepResult.failure("Inventory compensation failed: " + e.getMessage());
}
}
}
// Payment Processing Step
@Component
class PaymentProcessingStep extends SagaStep<OrderSagaData> {
private static final Logger logger = LoggerFactory.getLogger(PaymentProcessingStep.class);
private final PaymentService paymentService;
public PaymentProcessingStep(PaymentService paymentService) {
super("PAYMENT_PROCESSING");
this.paymentService = paymentService;
}
@Override
public StepResult execute(OrderSagaData sagaData) {
try {
logger.info("Processing payment for order: {}", sagaData.getOrderId());
// Process payment
PaymentResult paymentResult = paymentService.processPayment(
sagaData.getOrderId(),
sagaData.getCustomerId(),
sagaData.getTotalAmount(),
sagaData.getPaymentMethod()
);
if (!paymentResult.isSuccess()) {
return StepResult.failure("Payment processing failed: " + paymentResult.getErrorMessage());
}
// Store payment information for compensation
setContext("paymentId", paymentResult.getPaymentId());
setContext("paymentAmount", sagaData.getTotalAmount());
logger.info("Payment processed successfully. Payment ID: {}", paymentResult.getPaymentId());
return StepResult.success(paymentResult.getPaymentId());
} catch (Exception e) {
logger.error("Payment processing failed for order: {}", sagaData.getOrderId(), e);
return StepResult.failure("Payment processing failed: " + e.getMessage());
}
}
@Override
public StepResult compensate(OrderSagaData sagaData) {
try {
String paymentId = (String) getContext("paymentId");
if (paymentId != null) {
logger.info("Refunding payment: {}", paymentId);
paymentService.refundPayment(paymentId);
logger.info("Payment refunded: {}", paymentId);
}
return StepResult.success("Payment compensation completed");
} catch (Exception e) {
logger.error("Payment compensation failed for payment: {}", getContext("paymentId"), e);
return StepResult.failure("Payment compensation failed: " + e.getMessage());
}
}
}
// Shipping Arrangement Step
@Component
class ShippingArrangementStep extends SagaStep<OrderSagaData> {
private static final Logger logger = LoggerFactory.getLogger(ShippingArrangementStep.class);
private final ShippingService shippingService;
public ShippingArrangementStep(ShippingService shippingService) {
super("SHIPPING_ARRANGEMENT");
this.shippingService = shippingService;
}
@Override
public StepResult execute(OrderSagaData sagaData) {
try {
logger.info("Arranging shipping for order: {}", sagaData.getOrderId());
// Create shipping request
ShippingRequest shippingRequest = new ShippingRequest(
sagaData.getOrderId(),
sagaData.getCustomerId(),
sagaData.getShippingAddress(),
sagaData.getItems()
);
ShippingResult shippingResult = shippingService.createShipping(shippingRequest);
if (!shippingResult.isSuccess()) {
return StepResult.failure("Shipping arrangement failed: " + shippingResult.getErrorMessage());
}
// Store shipping information
setContext("trackingNumber", shippingResult.getTrackingNumber());
setContext("shippingCost", shippingResult.getShippingCost());
logger.info("Shipping arranged successfully. Tracking: {}", shippingResult.getTrackingNumber());
return StepResult.success(shippingResult.getTrackingNumber());
} catch (Exception e) {
logger.error("Shipping arrangement failed for order: {}", sagaData.getOrderId(), e);
return StepResult.failure("Shipping arrangement failed: " + e.getMessage());
}
}
@Override
public StepResult compensate(OrderSagaData sagaData) {
try {
String trackingNumber = (String) getContext("trackingNumber");
if (trackingNumber != null) {
logger.info("Cancelling shipping: {}", trackingNumber);
shippingService.cancelShipping(trackingNumber);
logger.info("Shipping cancelled: {}", trackingNumber);
}
return StepResult.success("Shipping compensation completed");
} catch (Exception e) {
logger.error("Shipping compensation failed for tracking: {}", getContext("trackingNumber"), e);
return StepResult.failure("Shipping compensation failed: " + e.getMessage());
}
}
}
// Order Confirmation Step
@Component
class OrderConfirmationStep extends SagaStep<OrderSagaData> {
private static final Logger logger = LoggerFactory.getLogger(OrderConfirmationStep.class);
private final OrderService orderService;
private final NotificationService notificationService;
public OrderConfirmationStep(OrderService orderService, NotificationService notificationService) {
super("ORDER_CONFIRMATION");
this.orderService = orderService;
this.notificationService = notificationService;
}
@Override
public StepResult execute(OrderSagaData sagaData) {
try {
logger.info("Confirming order: {}", sagaData.getOrderId());
// Confirm order in system
orderService.confirmOrder(sagaData.getOrderId());
// Send confirmation notification
notificationService.sendOrderConfirmation(
sagaData.getCustomerId(),
sagaData.getOrderId()
);
logger.info("Order confirmed successfully: {}", sagaData.getOrderId());
return StepResult.success("Order confirmed");
} catch (Exception e) {
logger.error("Order confirmation failed: {}", sagaData.getOrderId(), e);
return StepResult.failure("Order confirmation failed: " + e.getMessage());
}
}
@Override
public StepResult compensate(OrderSagaData sagaData) {
// Order confirmation cannot be easily rolled back
// We might mark the order as cancelled in the system
try {
logger.info("Reverting order confirmation: {}", sagaData.getOrderId());
orderService.cancelOrder(sagaData.getOrderId());
return StepResult.success("Order confirmation compensation completed");
} catch (Exception e) {
logger.error("Order confirmation compensation failed: {}", sagaData.getOrderId(), e);
return StepResult.failure("Order confirmation compensation failed: " + e.getMessage());
}
}
}
5. Saga Step Factory
package com.saga.factory;
import com.saga.core.SagaStep;
import com.saga.sagas.order.*;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
@Component
public class SagaStepFactory {
private final OrderValidationStep orderValidationStep;
private final InventoryReservationStep inventoryReservationStep;
private final PaymentProcessingStep paymentProcessingStep;
private final ShippingArrangementStep shippingArrangementStep;
private final OrderConfirmationStep orderConfirmationStep;
public SagaStepFactory(OrderValidationStep orderValidationStep,
InventoryReservationStep inventoryReservationStep,
PaymentProcessingStep paymentProcessingStep,
ShippingArrangementStep shippingArrangementStep,
OrderConfirmationStep orderConfirmationStep) {
this.orderValidationStep = orderValidationStep;
this.inventoryReservationStep = inventoryReservationStep;
this.paymentProcessingStep = paymentProcessingStep;
this.shippingArrangementStep = shippingArrangementStep;
this.orderConfirmationStep = orderConfirmationStep;
}
@SuppressWarnings("unchecked")
public <T> List<SagaStep<T>> createSteps(String sagaType, T sagaData) {
switch (sagaType) {
case "ORDER_PROCESSING":
return (List<SagaStep<T>>) Arrays.asList(
orderValidationStep,
inventoryReservationStep,
paymentProcessingStep,
shippingArrangementStep,
orderConfirmationStep
);
case "ORDER_CANCELLATION":
return (List<SagaStep<T>>) Arrays.asList(
// Steps for order cancellation saga
);
default:
throw new IllegalArgumentException("Unknown saga type: " + sagaType);
}
}
}
6. Supporting Service Interfaces
package com.saga.services;
import java.math.BigDecimal;
import java.util.Map;
// Inventory Service
public interface InventoryService {
String reserveInventory(String orderId, Map<String, Integer> productQuantities);
void releaseInventory(String reservationId);
boolean isProductAvailable(String productId, int quantity);
}
// Payment Service
public interface PaymentService {
PaymentResult processPayment(String orderId, String customerId,
BigDecimal amount, String paymentMethod);
void refundPayment(String paymentId);
}
class PaymentResult {
private final boolean success;
private final String paymentId;
private final String errorMessage;
public PaymentResult(boolean success, String paymentId, String errorMessage) {
this.success = success;
this.paymentId = paymentId;
this.errorMessage = errorMessage;
}
public boolean isSuccess() { return success; }
public String getPaymentId() { return paymentId; }
public String getErrorMessage() { return errorMessage; }
}
// Shipping Service
public interface ShippingService {
ShippingResult createShipping(ShippingRequest request);
void cancelShipping(String trackingNumber);
}
class ShippingRequest {
private final String orderId;
private final String customerId;
private final String shippingAddress;
private final Object items; // Simplified
public ShippingRequest(String orderId, String customerId, String shippingAddress, Object items) {
this.orderId = orderId;
this.customerId = customerId;
this.shippingAddress = shippingAddress;
this.items = items;
}
// Getters
public String getOrderId() { return orderId; }
public String getCustomerId() { return customerId; }
public String getShippingAddress() { return shippingAddress; }
public Object getItems() { return items; }
}
class ShippingResult {
private final boolean success;
private final String trackingNumber;
private final BigDecimal shippingCost;
private final String errorMessage;
public ShippingResult(boolean success, String trackingNumber,
BigDecimal shippingCost, String errorMessage) {
this.success = success;
this.trackingNumber = trackingNumber;
this.shippingCost = shippingCost;
this.errorMessage = errorMessage;
}
public boolean isSuccess() { return success; }
public String getTrackingNumber() { return trackingNumber; }
public BigDecimal getShippingCost() { return shippingCost; }
public String getErrorMessage() { return errorMessage; }
}
// Order Service
public interface OrderService {
void confirmOrder(String orderId);
void cancelOrder(String orderId);
boolean isCustomerValid(String customerId);
}
// Notification Service
public interface NotificationService {
void sendOrderConfirmation(String customerId, String orderId);
void sendOrderCancellation(String customerId, String orderId);
}
7. REST Controller for Saga Management
package com.saga.controller;
import com.saga.orchestrator.SagaOrchestrator;
import com.saga.sagas.order.OrderSagaData;
import com.saga.sagas.order.OrderItem;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
@RestController
@RequestMapping("/api/orders")
public class OrderController {
private final SagaOrchestrator sagaOrchestrator;
public OrderController(SagaOrchestrator sagaOrchestrator) {
this.sagaOrchestrator = sagaOrchestrator;
}
@PostMapping
public ResponseEntity<OrderResponse> createOrder(@RequestBody CreateOrderRequest request) {
try {
// Create order saga data
OrderSagaData sagaData = new OrderSagaData(
generateOrderId(),
request.getCustomerId(),
request.getItems(),
request.getTotalAmount(),
request.getShippingAddress(),
request.getPaymentMethod()
);
// Start order processing saga
String sagaId = sagaOrchestrator.startSaga("ORDER_PROCESSING", sagaData);
OrderResponse response = new OrderResponse(
sagaData.getOrderId(),
sagaId,
"Order processing started"
);
return ResponseEntity.accepted().body(response);
} catch (Exception e) {
return ResponseEntity.badRequest()
.body(new OrderResponse(null, null, "Failed to create order: " + e.getMessage()));
}
}
@GetMapping("/{orderId}/status")
public ResponseEntity<OrderStatusResponse> getOrderStatus(@PathVariable String orderId,
@RequestParam String sagaId) {
try {
SagaOrchestrator.SagaStatus status = sagaOrchestrator.getSagaStatus(sagaId);
OrderStatusResponse response = new OrderStatusResponse(
orderId,
sagaId,
status.name(),
"Status retrieved successfully"
);
return ResponseEntity.ok(response);
} catch (Exception e) {
return ResponseEntity.badRequest()
.body(new OrderStatusResponse(orderId, sagaId, "UNKNOWN",
"Failed to get status: " + e.getMessage()));
}
}
@PostMapping("/{orderId}/cancel")
public ResponseEntity<OrderResponse> cancelOrder(@PathVariable String orderId,
@RequestParam String sagaId) {
try {
// For cancellation, we would start a cancellation saga
// This is simplified for demonstration
return ResponseEntity.ok(new OrderResponse(orderId, sagaId, "Cancellation requested"));
} catch (Exception e) {
return ResponseEntity.badRequest()
.body(new OrderResponse(orderId, sagaId, "Failed to cancel order: " + e.getMessage()));
}
}
private String generateOrderId() {
return "ORD-" + System.currentTimeMillis() + "-" + (int)(Math.random() * 1000);
}
// DTO classes
public static class CreateOrderRequest {
private String customerId;
private List<OrderItem> items;
private BigDecimal totalAmount;
private String shippingAddress;
private String paymentMethod;
// Getters and setters
public String getCustomerId() { return customerId; }
public void setCustomerId(String customerId) { this.customerId = customerId; }
public List<OrderItem> getItems() { return items; }
public void setItems(List<OrderItem> items) { this.items = items; }
public BigDecimal getTotalAmount() { return totalAmount; }
public void setTotalAmount(BigDecimal totalAmount) { this.totalAmount = totalAmount; }
public String getShippingAddress() { return shippingAddress; }
public void setShippingAddress(String shippingAddress) { this.shippingAddress = shippingAddress; }
public String getPaymentMethod() { return paymentMethod; }
public void setPaymentMethod(String paymentMethod) { this.paymentMethod = paymentMethod; }
}
public static class OrderResponse {
private final String orderId;
private final String sagaId;
private final String message;
public OrderResponse(String orderId, String sagaId, String message) {
this.orderId = orderId;
this.sagaId = sagaId;
this.message = message;
}
// Getters
public String getOrderId() { return orderId; }
public String getSagaId() { return sagaId; }
public String getMessage() { return message; }
}
public static class OrderStatusResponse {
private final String orderId;
private final String sagaId;
private final String status;
private final String message;
public OrderStatusResponse(String orderId, String sagaId, String status, String message) {
this.orderId = orderId;
this.sagaId = sagaId;
this.status = status;
this.message = message;
}
// Getters
public String getOrderId() { return orderId; }
public String getSagaId() { return sagaId; }
public String getStatus() { return status; }
public String getMessage() { return message; }
}
}
8. Saga Recovery and Monitoring
package com.saga.recovery;
import com.saga.core.SagaExecutionContext;
import com.saga.core.SagaStatus;
import com.saga.orchestrator.SagaOrchestrator;
import com.saga.repository.SagaRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.List;
@Component
public class SagaRecoveryService {
private static final Logger logger = LoggerFactory.getLogger(SagaRecoveryService.class);
private final SagaRepository sagaRepository;
private final SagaOrchestrator sagaOrchestrator;
public SagaRecoveryService(SagaRepository sagaRepository, SagaOrchestrator sagaOrchestrator) {
this.sagaRepository = sagaRepository;
this.sagaOrchestrator = sagaOrchestrator;
}
@Scheduled(fixedRate = 30000) // Run every 30 seconds
public void recoverStuckSagas() {
logger.info("Starting saga recovery process...");
// Find sagas that are stuck in executing state for too long
LocalDateTime cutoff = LocalDateTime.now().minusMinutes(5);
// This would query the repository for stuck sagas
// List<SagaExecutionContext> stuckSagas = sagaRepository.findStuckSagas(cutoff);
// For each stuck saga, attempt recovery
/*
for (SagaExecutionContext saga : stuckSagas) {
logger.warn("Attempting recovery for stuck saga: {}", saga.getSagaId());
try {
// Re-execute or compensate based on business logic
if (shouldCompensate(saga)) {
sagaOrchestrator.compensateSaga(saga.getSagaId(), getSagaData(saga), saga);
} else {
sagaOrchestrator.continueSaga(saga.getSagaId(), getSagaData(saga));
}
logger.info("Recovery completed for saga: {}", saga.getSagaId());
} catch (Exception e) {
logger.error("Recovery failed for saga: {}", saga.getSagaId(), e);
}
}
*/
logger.info("Saga recovery process completed");
}
private boolean shouldCompensate(SagaExecutionContext saga) {
// Implement business logic to decide whether to compensate or continue
// For example, if saga has been stuck for too long, compensate
return saga.getStartedAt().isBefore(LocalDateTime.now().minusHours(1));
}
@Scheduled(fixedRate = 60000) // Run every minute
public void cleanupCompletedSagas() {
logger.info("Starting saga cleanup process...");
LocalDateTime cutoff = LocalDateTime.now().minusDays(1);
// Find completed sagas older than cutoff
// List<SagaExecutionContext> oldSagas = sagaRepository.findCompletedSagasBefore(cutoff);
/*
for (SagaExecutionContext saga : oldSagas) {
try {
sagaRepository.deleteSagaContext(saga.getSagaId());
logger.info("Cleaned up old saga: {}", saga.getSagaId());
} catch (Exception e) {
logger.error("Failed to clean up saga: {}", saga.getSagaId(), e);
}
}
*/
logger.info("Saga cleanup process completed");
}
}
9. Testing Saga Implementation
package com.saga.test;
import com.saga.core.*;
import com.saga.orchestrator.SagaOrchestrator;
import com.saga.sagas.order.*;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.math.BigDecimal;
import java.util.Arrays;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
class OrderSagaTest {
@Mock
private OrderService orderService;
@Mock
private InventoryService inventoryService;
@Mock
private PaymentService paymentService;
@Mock
private ShippingService shippingService;
@Mock
private NotificationService notificationService;
@Test
void testSuccessfulOrderSaga() {
// Setup
OrderSagaData sagaData = createTestOrderData();
OrderValidationStep validationStep = new OrderValidationStep(orderService);
InventoryReservationStep inventoryStep = new InventoryReservationStep(inventoryService);
PaymentProcessingStep paymentStep = new PaymentProcessingStep(paymentService);
when(orderService.isCustomerValid(any())).thenReturn(true);
when(inventoryService.reserveInventory(any(), any())).thenReturn("RES-123");
when(paymentService.processPayment(any(), any(), any(), any()))
.thenReturn(new PaymentResult(true, "PAY-123", null));
// Execute steps
StepResult validationResult = validationStep.execute(sagaData);
assertTrue(validationResult.isSuccess());
StepResult inventoryResult = inventoryStep.execute(sagaData);
assertTrue(inventoryResult.isSuccess());
StepResult paymentResult = paymentStep.execute(sagaData);
assertTrue(paymentResult.isSuccess());
// Verify interactions
verify(orderService).isCustomerValid("CUST-123");
verify(inventoryService).reserveInventory("ORD-123", any());
verify(paymentService).processPayment(any(), any(), any(), any());
}
@Test
void testFailedPaymentCompensation() {
// Setup
OrderSagaData sagaData = createTestOrderData();
OrderValidationStep validationStep = new OrderValidationStep(orderService);
InventoryReservationStep inventoryStep = new InventoryReservationStep(inventoryService);
PaymentProcessingStep paymentStep = new PaymentProcessingStep(paymentService);
when(orderService.isCustomerValid(any())).thenReturn(true);
when(inventoryService.reserveInventory(any(), any())).thenReturn("RES-123");
when(paymentService.processPayment(any(), any(), any(), any()))
.thenReturn(new PaymentResult(false, null, "Insufficient funds"));
// Execute steps
StepResult validationResult = validationStep.execute(sagaData);
assertTrue(validationResult.isSuccess());
StepResult inventoryResult = inventoryStep.execute(sagaData);
assertTrue(inventoryResult.isSuccess());
StepResult paymentResult = paymentStep.execute(sagaData);
assertFalse(paymentResult.isSuccess());
// Compensate previous steps
StepResult inventoryCompensation = inventoryStep.compensate(sagaData);
assertTrue(inventoryCompensation.isSuccess());
// Verify compensation was called
verify(inventoryService).releaseInventory("RES-123");
}
private OrderSagaData createTestOrderData() {
OrderItem item1 = new OrderItem("PROD-1", "Test Product", 2, new BigDecimal("29.99"));
OrderItem item2 = new OrderItem("PROD-2", "Another Product", 1, new BigDecimal("49.99"));
return new OrderSagaData(
"ORD-123",
"CUST-123",
Arrays.asList(item1, item2),
new BigDecimal("109.97"),
"123 Main St, City, Country",
"CREDIT_CARD"
);
}
}
Best Practices for Saga Pattern
1. Idempotency:
- Design steps to be idempotent
- Use unique identifiers for operations
- Implement retry mechanisms with idempotency keys
2. Compensation Logic:
- Ensure compensating actions can handle partial failures
- Design compensation to be idempotent
- Consider business constraints when designing rollbacks
3. Monitoring and Observability:
- Log all saga steps and compensations
- Implement comprehensive metrics
- Create dashboards for saga status monitoring
4. Error Handling:
- Implement timeouts for long-running steps
- Design retry strategies with exponential backoff
- Provide manual intervention capabilities
Conclusion
The Saga Pattern provides a robust solution for managing distributed transactions in microservices architectures. Key benefits include:
- Data Consistency: Maintains consistency across services without distributed locks
- Scalability: Enables horizontal scaling of transactional processes
- Fault Tolerance: Handles partial failures gracefully through compensation
- Business Alignment: Models complex business processes naturally
Implementation Checklist:
- ✅ Define clear saga steps with compensating actions
- ✅ Implement idempotent operations
- ✅ Design proper error handling and retry mechanisms
- ✅ Implement saga persistence and recovery
- ✅ Add comprehensive monitoring and logging
- ✅ Create thorough tests for success and failure scenarios
- ✅ Implement saga cleanup and maintenance
Saga Pattern is particularly well-suited for:
- E-commerce systems (order processing, inventory management)
- Financial services (money transfers, payment processing)
- Travel booking (flight + hotel + car rental reservations)
- Supply chain management (order fulfillment, inventory updates)
By following this comprehensive implementation guide, you can build reliable, scalable distributed transaction systems using the Saga Pattern in Java.