Long-running workflows are business processes that can span hours, days, or even months, requiring persistence, state management, and recovery mechanisms. They are essential for complex business operations like order processing, loan applications, and insurance claims.
Core Concepts
Characteristics of Long-Running Workflows
- Durability: Survive process restarts and failures
- State Management: Maintain process state across steps
- Compensation: Handle failures with rollback logic
- Resumability: Continue from last successful step
- Monitoring: Track progress and performance
Implementation Approaches
1. State Machine Pattern with Persistence
Example 1: Order Processing Workflow
// Workflow State Enum
public enum WorkflowState {
CREATED,
VALIDATING,
PAYMENT_PROCESSING,
INVENTORY_RESERVATION,
SHIPPING,
COMPLETED,
CANCELLED,
FAILED
}
// Workflow Context
@Entity
@Table(name = "workflow_instances")
public class WorkflowInstance {
@Id
private String id;
@Enumerated(EnumType.STRING)
private WorkflowState currentState;
@Column(columnDefinition = "TEXT")
private String contextData; // JSON serialized workflow context
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
private LocalDateTime completedAt;
private int retryCount;
private String lastError;
@Version
private Long version;
// constructors, getters, setters
public WorkflowInstance() {
this.id = UUID.randomUUID().toString();
this.currentState = WorkflowState.CREATED;
this.createdAt = LocalDateTime.now();
this.updatedAt = LocalDateTime.now();
}
public WorkflowInstance(String id, WorkflowState state) {
this.id = id;
this.currentState = state;
this.createdAt = LocalDateTime.now();
this.updatedAt = LocalDateTime.now();
}
}
// Workflow Context Data
public class OrderWorkflowContext {
private String orderId;
private String customerId;
private BigDecimal amount;
private List<OrderItem> items;
private String paymentTransactionId;
private String shippingTrackingId;
private Map<String, Object> metadata = new HashMap<>();
// Compensation data
private boolean paymentCompensated = false;
private boolean inventoryCompensated = false;
// constructors, getters, setters
}
// Workflow Step Interface
public interface WorkflowStep {
String getName();
WorkflowState getTargetState();
WorkflowState getCompensationState();
WorkflowExecutionResult execute(WorkflowInstance instance, OrderWorkflowContext context);
WorkflowExecutionResult compensate(WorkflowInstance instance, OrderWorkflowContext context);
default boolean shouldExecute(WorkflowInstance instance, OrderWorkflowContext context) {
return true;
}
}
// Workflow Execution Result
public class WorkflowExecutionResult {
private final boolean success;
private final String message;
private final Map<String, Object> data;
private final Throwable error;
private WorkflowExecutionResult(boolean success, String message,
Map<String, Object> data, Throwable error) {
this.success = success;
this.message = message;
this.data = data != null ? data : new HashMap<>();
this.error = error;
}
public static WorkflowExecutionResult success(String message) {
return new WorkflowExecutionResult(true, message, null, null);
}
public static WorkflowExecutionResult success(String message, Map<String, Object> data) {
return new WorkflowExecutionResult(true, message, data, null);
}
public static WorkflowExecutionResult failure(String message) {
return new WorkflowExecutionResult(false, message, null, null);
}
public static WorkflowExecutionResult failure(String message, Throwable error) {
return new WorkflowExecutionResult(false, message, null, error);
}
// getters
}
// Concrete Workflow Steps
@Component
@Slf4j
public class OrderValidationStep implements WorkflowStep {
private final OrderRepository orderRepository;
private final CustomerService customerService;
public OrderValidationStep(OrderRepository orderRepository,
CustomerService customerService) {
this.orderRepository = orderRepository;
this.customerService = customerService;
}
@Override
public String getName() {
return "ORDER_VALIDATION";
}
@Override
public WorkflowState getTargetState() {
return WorkflowState.VALIDATING;
}
@Override
public WorkflowState getCompensationState() {
return WorkflowState.CREATED; // No compensation needed for validation
}
@Override
public WorkflowExecutionResult execute(WorkflowInstance instance, OrderWorkflowContext context) {
try {
log.info("Validating order: {}", context.getOrderId());
// 1. Validate order existence
Order order = orderRepository.findById(context.getOrderId())
.orElseThrow(() -> new WorkflowException("Order not found: " + context.getOrderId()));
// 2. Validate customer
Customer customer = customerService.getCustomer(context.getCustomerId());
if (customer == null) {
return WorkflowExecutionResult.failure("Customer not found: " + context.getCustomerId());
}
// 3. Validate order amount
if (order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
return WorkflowExecutionResult.failure("Invalid order amount: " + order.getAmount());
}
// 4. Check inventory availability (simplified)
if (!isInventoryAvailable(context.getItems())) {
return WorkflowExecutionResult.failure("Insufficient inventory");
}
log.info("Order validation successful: {}", context.getOrderId());
return WorkflowExecutionResult.success("Order validated successfully");
} catch (Exception e) {
log.error("Order validation failed for order: {}", context.getOrderId(), e);
return WorkflowExecutionResult.failure("Validation failed: " + e.getMessage(), e);
}
}
@Override
public WorkflowExecutionResult compensate(WorkflowInstance instance, OrderWorkflowContext context) {
// No compensation needed for validation
return WorkflowExecutionResult.success("No compensation needed for validation");
}
private boolean isInventoryAvailable(List<OrderItem> items) {
// Simplified inventory check
return items.stream().allMatch(item -> item.getQuantity() <= 100);
}
}
@Component
@Slf4j
public class PaymentProcessingStep implements WorkflowStep {
private final PaymentService paymentService;
public PaymentProcessingStep(PaymentService paymentService) {
this.paymentService = paymentService;
}
@Override
public String getName() {
return "PAYMENT_PROCESSING";
}
@Override
public WorkflowState getTargetState() {
return WorkflowState.PAYMENT_PROCESSING;
}
@Override
public WorkflowState getCompensationState() {
return WorkflowState.VALIDATING;
}
@Override
public WorkflowExecutionResult execute(WorkflowInstance instance, OrderWorkflowContext context) {
try {
log.info("Processing payment for order: {}", context.getOrderId());
PaymentRequest request = new PaymentRequest(
context.getOrderId(),
context.getCustomerId(),
context.getAmount(),
"USD"
);
PaymentResponse response = paymentService.processPayment(request);
if (response.isSuccess()) {
context.setPaymentTransactionId(response.getTransactionId());
log.info("Payment processed successfully: {}", response.getTransactionId());
Map<String, Object> resultData = new HashMap<>();
resultData.put("transactionId", response.getTransactionId());
resultData.put("paymentMethod", response.getPaymentMethod());
return WorkflowExecutionResult.success("Payment processed successfully", resultData);
} else {
return WorkflowExecutionResult.failure("Payment failed: " + response.getErrorMessage());
}
} catch (Exception e) {
log.error("Payment processing failed for order: {}", context.getOrderId(), e);
return WorkflowExecutionResult.failure("Payment processing failed: " + e.getMessage(), e);
}
}
@Override
public WorkflowExecutionResult compensate(WorkflowInstance instance, OrderWorkflowContext context) {
try {
if (context.getPaymentTransactionId() != null && !context.isPaymentCompensated()) {
log.info("Compensating payment for transaction: {}", context.getPaymentTransactionId());
boolean refunded = paymentService.refundPayment(context.getPaymentTransactionId());
if (refunded) {
context.setPaymentCompensated(true);
return WorkflowExecutionResult.success("Payment refunded successfully");
} else {
return WorkflowExecutionResult.failure("Payment refund failed");
}
}
return WorkflowExecutionResult.success("No payment to compensate");
} catch (Exception e) {
log.error("Payment compensation failed for transaction: {}",
context.getPaymentTransactionId(), e);
return WorkflowExecutionResult.failure("Payment compensation failed: " + e.getMessage(), e);
}
}
}
@Component
@Slf4j
public class InventoryReservationStep implements WorkflowStep {
private final InventoryService inventoryService;
public InventoryReservationStep(InventoryService inventoryService) {
this.inventoryService = inventoryService;
}
@Override
public String getName() {
return "INVENTORY_RESERVATION";
}
@Override
public WorkflowState getTargetState() {
return WorkflowState.INVENTORY_RESERVATION;
}
@Override
public WorkflowState getCompensationState() {
return WorkflowState.PAYMENT_PROCESSING;
}
@Override
public WorkflowExecutionResult execute(WorkflowInstance instance, OrderWorkflowContext context) {
try {
log.info("Reserving inventory for order: {}", context.getOrderId());
List<InventoryReservation> reservations = context.getItems().stream()
.map(item -> new InventoryReservation(
item.getProductId(),
item.getQuantity(),
context.getOrderId()
))
.collect(Collectors.toList());
boolean reserved = inventoryService.reserveItems(reservations);
if (reserved) {
log.info("Inventory reserved successfully for order: {}", context.getOrderId());
return WorkflowExecutionResult.success("Inventory reserved successfully");
} else {
return WorkflowExecutionResult.failure("Inventory reservation failed");
}
} catch (Exception e) {
log.error("Inventory reservation failed for order: {}", context.getOrderId(), e);
return WorkflowExecutionResult.failure("Inventory reservation failed: " + e.getMessage(), e);
}
}
@Override
public WorkflowExecutionResult compensate(WorkflowInstance instance, OrderWorkflowContext context) {
try {
if (!context.isInventoryCompensated()) {
log.info("Compensating inventory for order: {}", context.getOrderId());
boolean released = inventoryService.releaseReservation(context.getOrderId());
if (released) {
context.setInventoryCompensated(true);
return WorkflowExecutionResult.success("Inventory reservation released");
} else {
return WorkflowExecutionResult.failure("Inventory release failed");
}
}
return WorkflowExecutionResult.success("No inventory to compensate");
} catch (Exception e) {
log.error("Inventory compensation failed for order: {}", context.getOrderId(), e);
return WorkflowExecutionResult.failure("Inventory compensation failed: " + e.getMessage(), e);
}
}
}
// Workflow Engine
@Component
@Slf4j
public class WorkflowEngine {
private final WorkflowInstanceRepository instanceRepository;
private final List<WorkflowStep> workflowSteps;
private final ObjectMapper objectMapper;
private final TaskScheduler taskScheduler;
public WorkflowEngine(WorkflowInstanceRepository instanceRepository,
List<WorkflowStep> workflowSteps,
ObjectMapper objectMapper,
TaskScheduler taskScheduler) {
this.instanceRepository = instanceRepository;
this.workflowSteps = workflowSteps;
this.objectMapper = objectMapper;
this.taskScheduler = taskScheduler;
}
@Transactional
public WorkflowInstance startWorkflow(String workflowType, Object context) {
try {
WorkflowInstance instance = new WorkflowInstance();
instance.setCurrentState(WorkflowState.CREATED);
String contextData = objectMapper.writeValueAsString(context);
instance.setContextData(contextData);
WorkflowInstance savedInstance = instanceRepository.save(instance);
log.info("Started workflow instance: {}", savedInstance.getId());
// Schedule execution
scheduleWorkflowExecution(savedInstance.getId());
return savedInstance;
} catch (Exception e) {
throw new WorkflowException("Failed to start workflow", e);
}
}
public void scheduleWorkflowExecution(String instanceId) {
taskScheduler.schedule(() -> {
try {
executeWorkflowStep(instanceId);
} catch (Exception e) {
log.error("Failed to execute workflow step for instance: {}", instanceId, e);
}
}, Instant.now().plusSeconds(1));
}
@Transactional
public void executeWorkflowStep(String instanceId) {
WorkflowInstance instance = instanceRepository.findById(instanceId)
.orElseThrow(() -> new WorkflowException("Workflow instance not found: " + instanceId));
try {
OrderWorkflowContext context = objectMapper.readValue(
instance.getContextData(), OrderWorkflowContext.class);
WorkflowStep nextStep = findNextStep(instance.getCurrentState());
if (nextStep == null) {
log.info("Workflow completed: {}", instanceId);
instance.setCurrentState(WorkflowState.COMPLETED);
instance.setCompletedAt(LocalDateTime.now());
instanceRepository.save(instance);
return;
}
if (nextStep.shouldExecute(instance, context)) {
log.info("Executing step: {} for workflow: {}", nextStep.getName(), instanceId);
instance.setCurrentState(nextStep.getTargetState());
instance.setUpdatedAt(LocalDateTime.now());
WorkflowExecutionResult result = nextStep.execute(instance, context);
if (result.isSuccess()) {
// Update context with result data
context.getMetadata().putAll(result.getData());
instance.setContextData(objectMapper.writeValueAsString(context));
instance.setRetryCount(0);
instance.setLastError(null);
instanceRepository.save(instance);
log.info("Step {} completed successfully for workflow: {}",
nextStep.getName(), instanceId);
// Schedule next step
scheduleWorkflowExecution(instanceId);
} else {
handleStepFailure(instance, context, nextStep, result);
}
} else {
log.info("Skipping step: {} for workflow: {}", nextStep.getName(), instanceId);
scheduleWorkflowExecution(instanceId);
}
} catch (Exception e) {
log.error("Workflow execution failed for instance: {}", instanceId, e);
handleExecutionError(instance, e);
}
}
private WorkflowStep findNextStep(WorkflowState currentState) {
return workflowSteps.stream()
.filter(step -> step.getTargetState().ordinal() > currentState.ordinal())
.min(Comparator.comparing(step -> step.getTargetState().ordinal()))
.orElse(null);
}
private void handleStepFailure(WorkflowInstance instance, OrderWorkflowContext context,
WorkflowStep failedStep, WorkflowExecutionResult result) {
try {
instance.setRetryCount(instance.getRetryCount() + 1);
instance.setLastError(result.getMessage());
instance.setUpdatedAt(LocalDateTime.now());
if (instance.getRetryCount() >= 3) {
log.error("Step {} failed after {} retries for workflow: {}",
failedStep.getName(), instance.getRetryCount(), instance.getId());
// Start compensation
startCompensation(instance, context, failedStep);
} else {
// Retry the same step
log.info("Scheduling retry {} for step {} in workflow: {}",
instance.getRetryCount(), failedStep.getName(), instance.getId());
instanceRepository.save(instance);
// Schedule retry with exponential backoff
long delaySeconds = (long) Math.pow(2, instance.getRetryCount()) * 5;
taskScheduler.schedule(() -> executeWorkflowStep(instance.getId()),
Instant.now().plusSeconds(delaySeconds));
}
} catch (Exception e) {
log.error("Error handling step failure for workflow: {}", instance.getId(), e);
handleExecutionError(instance, e);
}
}
private void startCompensation(WorkflowInstance instance, OrderWorkflowContext context,
WorkflowStep failedStep) {
try {
log.info("Starting compensation for workflow: {}", instance.getId());
// Execute compensation for failed step and previous steps
List<WorkflowStep> stepsToCompensate = workflowSteps.stream()
.filter(step -> step.getTargetState().ordinal() >= failedStep.getCompensationState().ordinal()
&& step.getTargetState().ordinal() <= failedStep.getTargetState().ordinal())
.sorted((s1, s2) -> Integer.compare(s2.getTargetState().ordinal(), s1.getTargetState().ordinal()))
.collect(Collectors.toList());
for (WorkflowStep step : stepsToCompensate) {
log.info("Compensating step: {} for workflow: {}", step.getName(), instance.getId());
WorkflowExecutionResult compensationResult = step.compensate(instance, context);
if (!compensationResult.isSuccess()) {
log.error("Compensation failed for step: {} in workflow: {}",
step.getName(), instance.getId());
// Continue with other compensations despite failures
}
}
instance.setCurrentState(WorkflowState.FAILED);
instance.setContextData(objectMapper.writeValueAsString(context));
instanceRepository.save(instance);
log.info("Compensation completed for workflow: {}", instance.getId());
} catch (Exception e) {
log.error("Compensation process failed for workflow: {}", instance.getId(), e);
instance.setCurrentState(WorkflowState.FAILED);
instanceRepository.save(instance);
}
}
private void handleExecutionError(WorkflowInstance instance, Exception error) {
instance.setCurrentState(WorkflowState.FAILED);
instance.setLastError(error.getMessage());
instance.setUpdatedAt(LocalDateTime.now());
instanceRepository.save(instance);
}
@Transactional
public void resumeWorkflow(String instanceId) {
WorkflowInstance instance = instanceRepository.findById(instanceId)
.orElseThrow(() -> new WorkflowException("Workflow instance not found: " + instanceId));
if (instance.getCurrentState() == WorkflowState.FAILED) {
log.info("Resuming failed workflow: {}", instanceId);
instance.setCurrentState(WorkflowState.CREATED);
instance.setRetryCount(0);
instance.setLastError(null);
instanceRepository.save(instance);
scheduleWorkflowExecution(instanceId);
}
}
public WorkflowInstance getWorkflowStatus(String instanceId) {
return instanceRepository.findById(instanceId)
.orElseThrow(() -> new WorkflowException("Workflow instance not found: " + instanceId));
}
}
// Custom Exceptions
class WorkflowException extends RuntimeException {
public WorkflowException(String message) {
super(message);
}
public WorkflowException(String message, Throwable cause) {
super(message, cause);
}
}
2. Saga Pattern for Distributed Transactions
Example 2: Saga Pattern Implementation
// Saga Definition
public class OrderSaga {
private String sagaId;
private String orderId;
private SagaState state;
private List<SagaStep> steps;
private int currentStep;
private Map<String, Object> context;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
// constructors, getters, setters
}
// Saga Step
public interface SagaStep {
String getName();
SagaExecutionResult execute(Saga saga);
SagaExecutionResult compensate(Saga saga);
}
// Saga Coordinator
@Component
@Slf4j
public class SagaCoordinator {
private final SagaRepository sagaRepository;
private final Map<String, SagaStep> sagaSteps;
private final ObjectMapper objectMapper;
public SagaCoordinator(SagaRepository sagaRepository,
List<SagaStep> steps,
ObjectMapper objectMapper) {
this.sagaRepository = sagaRepository;
this.objectMapper = objectMapper;
this.sagaSteps = steps.stream()
.collect(Collectors.toMap(SagaStep::getName, Function.identity()));
}
@Transactional
public String startSaga(String sagaType, Map<String, Object> initialContext) {
Saga saga = new Saga();
saga.setSagaId(UUID.randomUUID().toString());
saga.setState(SagaState.STARTED);
saga.setContext(initialContext);
saga.setCreatedAt(LocalDateTime.now());
saga.setUpdatedAt(LocalDateTime.now());
// Define saga steps based on type
saga.setSteps(defineSagaSteps(sagaType));
saga.setCurrentStep(0);
Saga savedSaga = sagaRepository.save(saga);
executeNextStep(savedSaga);
return savedSaga.getSagaId();
}
private List<SagaStep> defineSagaSteps(String sagaType) {
// Return steps based on saga type
return List.of(
sagaSteps.get("validateOrder"),
sagaSteps.get("processPayment"),
sagaSteps.get("reserveInventory"),
sagaSteps.get("shipOrder")
);
}
public void executeNextStep(Saga saga) {
if (saga.getCurrentStep() >= saga.getSteps().size()) {
saga.setState(SagaState.COMPLETED);
sagaRepository.save(saga);
return;
}
SagaStep currentStep = saga.getSteps().get(saga.getCurrentStep());
try {
SagaExecutionResult result = currentStep.execute(saga);
if (result.isSuccess()) {
saga.setCurrentStep(saga.getCurrentStep() + 1);
saga.setUpdatedAt(LocalDateTime.now());
sagaRepository.save(saga);
executeNextStep(saga);
} else {
startCompensation(saga);
}
} catch (Exception e) {
log.error("Saga step execution failed: {}", currentStep.getName(), e);
startCompensation(saga);
}
}
private void startCompensation(Saga saga) {
saga.setState(SagaState.COMPENSATING);
sagaRepository.save(saga);
// Execute compensation in reverse order
for (int i = saga.getCurrentStep() - 1; i >= 0; i--) {
SagaStep step = saga.getSteps().get(i);
try {
step.compensate(saga);
} catch (Exception e) {
log.error("Compensation failed for step: {}", step.getName(), e);
}
}
saga.setState(SagaState.FAILED);
sagaRepository.save(saga);
}
}
3. Temporal.io for Workflow Orchestration
Example 3: Temporal Workflow Implementation
// Temporal Workflow Interface
@WorkflowInterface
public interface OrderWorkflow {
@WorkflowMethod
void processOrder(Order order);
@SignalMethod
void updatePaymentStatus(PaymentStatus status);
@SignalMethod
void cancelOrder();
@QueryMethod
OrderStatus getOrderStatus();
}
// Temporal Workflow Implementation
public class OrderWorkflowImpl implements OrderWorkflow {
private final OrderActivities activities = Workflow.newActivityStub(OrderActivities.class);
private OrderStatus status = OrderStatus.CREATED;
private boolean cancelled = false;
@Override
public void processOrder(Order order) {
try {
// Step 1: Validate order
activities.validateOrder(order);
status = OrderStatus.VALIDATED;
// Wait for payment
Workflow.await(() -> status == OrderStatus.PAYMENT_RECEIVED || cancelled);
if (cancelled) {
activities.cancelOrder(order);
status = OrderStatus.CANCELLED;
return;
}
// Step 2: Reserve inventory
activities.reserveInventory(order);
status = OrderStatus.INVENTORY_RESERVED;
// Step 3: Ship order
activities.shipOrder(order);
status = OrderStatus.SHIPPED;
// Step 4: Send confirmation
activities.sendConfirmation(order);
status = OrderStatus.COMPLETED;
} catch (Exception e) {
activities.compensate(order);
status = OrderStatus.FAILED;
throw e;
}
}
@Override
public void updatePaymentStatus(PaymentStatus paymentStatus) {
if (paymentStatus == PaymentStatus.COMPLETED) {
this.status = OrderStatus.PAYMENT_RECEIVED;
}
}
@Override
public void cancelOrder() {
this.cancelled = true;
}
@Override
public OrderStatus getOrderStatus() {
return status;
}
}
// Temporal Activities
public interface OrderActivities {
@ActivityMethod
void validateOrder(Order order);
@ActivityMethod
void reserveInventory(Order order);
@ActivityMethod
void shipOrder(Order order);
@ActivityMethod
void sendConfirmation(Order order);
@ActivityMethod
void cancelOrder(Order order);
@ActivityMethod
void compensate(Order order);
}
4. Event-Driven Workflows with State Machines
Example 4: Event-Driven Workflow with Spring State Machine
// Configuration
@Configuration
@EnableStateMachine
public class StateMachineConfig extends StateMachineConfigurerAdapter<WorkflowState, WorkflowEvent> {
@Override
public void configure(StateMachineStateConfigurer<WorkflowState, WorkflowEvent> states)
throws Exception {
states
.withStates()
.initial(WorkflowState.CREATED)
.state(WorkflowState.VALIDATING)
.state(WorkflowState.PAYMENT_PROCESSING)
.state(WorkflowState.INVENTORY_RESERVATION)
.state(WorkflowState.SHIPPING)
.end(WorkflowState.COMPLETED)
.end(WorkflowState.CANCELLED)
.end(WorkflowState.FAILED);
}
@Override
public void configure(StateMachineTransitionConfigurer<WorkflowState, WorkflowEvent> transitions)
throws Exception {
transitions
.withExternal()
.source(WorkflowState.CREATED).target(WorkflowState.VALIDATING)
.event(WorkflowEvent.START_PROCESSING)
.action(validationAction())
.and()
.withExternal()
.source(WorkflowState.VALIDATING).target(WorkflowState.PAYMENT_PROCESSING)
.event(WorkflowEvent.VALIDATION_SUCCESS)
.action(paymentAction())
.and()
.withExternal()
.source(WorkflowState.VALIDATING).target(WorkflowState.FAILED)
.event(WorkflowEvent.VALIDATION_FAILED)
.and()
.withExternal()
.source(WorkflowState.PAYMENT_PROCESSING).target(WorkflowState.INVENTORY_RESERVATION)
.event(WorkflowEvent.PAYMENT_SUCCESS)
.action(inventoryAction())
.and()
.withExternal()
.source(WorkflowState.PAYMENT_PROCESSING).target(WorkflowState.FAILED)
.event(WorkflowEvent.PAYMENT_FAILED);
}
@Bean
public Action<WorkflowState, WorkflowEvent> validationAction() {
return context -> {
OrderWorkflowContext workflowContext = context.getExtendedState()
.get("context", OrderWorkflowContext.class);
// Execute validation logic
};
}
// Other action beans...
}
// Workflow Service
@Service
@Slf4j
public class StateMachineWorkflowService {
private final StateMachineFactory<WorkflowState, WorkflowEvent> stateMachineFactory;
private final StateMachinePersist<WorkflowState, WorkflowEvent, String> persister;
public StateMachineWorkflowService(StateMachineFactory<WorkflowState, WorkflowEvent> stateMachineFactory,
StateMachinePersist<WorkflowState, WorkflowEvent, String> persister) {
this.stateMachineFactory = stateMachineFactory;
this.persister = persister;
}
public String startWorkflow(OrderWorkflowContext context) {
String workflowId = UUID.randomUUID().toString();
StateMachine<WorkflowState, WorkflowEvent> stateMachine = stateMachineFactory.getStateMachine(workflowId);
stateMachine.getExtendedState().getVariables().put("context", context);
stateMachine.start();
stateMachine.sendEvent(WorkflowEvent.START_PROCESSING);
return workflowId;
}
public void handleEvent(String workflowId, WorkflowEvent event) {
StateMachine<WorkflowState, WorkflowEvent> stateMachine = stateMachineFactory.getStateMachine(workflowId);
stateMachine.sendEvent(event);
}
public WorkflowState getWorkflowState(String workflowId) {
StateMachine<WorkflowState, WorkflowEvent> stateMachine = stateMachineFactory.getStateMachine(workflowId);
return stateMachine.getState().getId();
}
}
Advanced Patterns
5. Workflow Recovery and Monitoring
Example 5: Workflow Recovery Service
@Service
@Slf4j
public class WorkflowRecoveryService {
private final WorkflowInstanceRepository instanceRepository;
private final WorkflowEngine workflowEngine;
private final MeterRegistry meterRegistry;
public WorkflowRecoveryService(WorkflowInstanceRepository instanceRepository,
WorkflowEngine workflowEngine,
MeterRegistry meterRegistry) {
this.instanceRepository = instanceRepository;
this.workflowEngine = workflowEngine;
this.meterRegistry = meterRegistry;
}
@Scheduled(fixedRate = 30000) // Run every 30 seconds
public void recoverStalledWorkflows() {
log.info("Starting workflow recovery process");
LocalDateTime cutoffTime = LocalDateTime.now().minusMinutes(5);
List<WorkflowInstance> stalledWorkflows = instanceRepository
.findByCurrentStateNotInAndUpdatedAtBefore(
List.of(WorkflowState.COMPLETED, WorkflowState.FAILED, WorkflowState.CANCELLED),
cutoffTime
);
log.info("Found {} stalled workflows", stalledWorkflows.size());
for (WorkflowInstance instance : stalledWorkflows) {
try {
log.info("Recovering stalled workflow: {}", instance.getId());
workflowEngine.scheduleWorkflowExecution(instance.getId());
meterRegistry.counter("workflow.recovery.attempted").increment();
} catch (Exception e) {
log.error("Failed to recover workflow: {}", instance.getId(), e);
meterRegistry.counter("workflow.recovery.failed").increment();
}
}
}
@Scheduled(cron = "0 0 2 * * ?") // Daily at 2 AM
public void cleanupOldWorkflows() {
log.info("Starting workflow cleanup");
LocalDateTime cutoffTime = LocalDateTime.now().minusDays(30);
int deletedCount = instanceRepository.deleteByCompletedAtBefore(cutoffTime);
log.info("Cleaned up {} old workflow instances", deletedCount);
meterRegistry.counter("workflow.cleanup.executed").increment();
}
}
// Monitoring Service
@Service
@Slf4j
public class WorkflowMonitoringService {
private final WorkflowInstanceRepository instanceRepository;
private final MeterRegistry meterRegistry;
private final Map<WorkflowState, Gauge> stateGauges = new ConcurrentHashMap<>();
public WorkflowMonitoringService(WorkflowInstanceRepository instanceRepository,
MeterRegistry meterRegistry) {
this.instanceRepository = instanceRepository;
this.meterRegistry = meterRegistry;
initializeMetrics();
}
private void initializeMetrics() {
// Create gauges for each workflow state
for (WorkflowState state : WorkflowState.values()) {
Gauge gauge = Gauge.builder("workflow.instances.by.state")
.tag("state", state.name())
.description("Number of workflow instances in state")
.register(meterRegistry, this,
monitor -> instanceRepository.countByCurrentState(state));
stateGauges.put(state, gauge);
}
// Timer for workflow duration
Timer.builder("workflow.execution.duration")
.description("Workflow execution duration")
.register(meterRegistry);
}
public WorkflowMetrics getWorkflowMetrics() {
WorkflowMetrics metrics = new WorkflowMetrics();
metrics.setTotalInstances(instanceRepository.count());
metrics.setCompletedInstances(instanceRepository.countByCurrentState(WorkflowState.COMPLETED));
metrics.setFailedInstances(instanceRepository.countByCurrentState(WorkflowState.FAILED));
metrics.setActiveInstances(instanceRepository.countByCurrentStateNotIn(
List.of(WorkflowState.COMPLETED, WorkflowState.FAILED, WorkflowState.CANCELLED)));
// Calculate average duration
List<WorkflowInstance> completedInstances = instanceRepository
.findByCurrentState(WorkflowState.COMPLETED);
if (!completedInstances.isEmpty()) {
double avgDuration = completedInstances.stream()
.mapToLong(instance ->
Duration.between(instance.getCreatedAt(), instance.getCompletedAt()).toMinutes())
.average()
.orElse(0.0);
metrics.setAverageDurationMinutes(avgDuration);
}
return metrics;
}
}
// Metrics DTO
public class WorkflowMetrics {
private long totalInstances;
private long completedInstances;
private long failedInstances;
private long activeInstances;
private double averageDurationMinutes;
// getters, setters
}
6. Compensation and Error Handling
Example 6: Advanced Compensation Handler
@Component
@Slf4j
public class CompensationHandler {
private final Map<String, CompensationStrategy> compensationStrategies;
public CompensationHandler(List<CompensationStrategy> strategies) {
this.compensationStrategies = strategies.stream()
.collect(Collectors.toMap(CompensationStrategy::getStepName, Function.identity()));
}
public CompensationResult executeCompensation(String workflowId,
String failedStep,
Map<String, Object> context) {
log.info("Executing compensation for workflow: {}, failed step: {}", workflowId, failedStep);
List<CompensationStep> compensationSteps = determineCompensationSteps(failedStep);
CompensationResult result = new CompensationResult();
for (CompensationStep step : compensationSteps) {
try {
CompensationStrategy strategy = compensationStrategies.get(step.getStepName());
if (strategy != null) {
CompensationStepResult stepResult = strategy.compensate(context);
result.addStepResult(stepResult);
if (!stepResult.isSuccess()) {
log.error("Compensation failed for step: {}", step.getStepName());
result.setOverallSuccess(false);
// Continue with other compensations despite failures
}
} else {
log.warn("No compensation strategy found for step: {}", step.getStepName());
}
} catch (Exception e) {
log.error("Unexpected error during compensation for step: {}", step.getStepName(), e);
result.setOverallSuccess(false);
}
}
log.info("Compensation completed for workflow: {}, overall success: {}",
workflowId, result.isOverallSuccess());
return result;
}
private List<CompensationStep> determineCompensationSteps(String failedStep) {
// Define compensation order based on the failed step
// This could be configured externally
Map<String, List<String>> compensationOrder = Map.of(
"SHIPPING", List.of("inventoryReservation", "paymentProcessing"),
"INVENTORY_RESERVATION", List.of("paymentProcessing"),
"PAYMENT_PROCESSING", List.of() // No compensation needed for payment failure
);
return compensationOrder.getOrDefault(failedStep, List.of()).stream()
.map(stepName -> new CompensationStep(stepName, CompensationPriority.HIGH))
.collect(Collectors.toList());
}
}
// Compensation Strategy Interface
public interface CompensationStrategy {
String getStepName();
CompensationStepResult compensate(Map<String, Object> context);
}
@Component
@Slf4j
public class PaymentCompensationStrategy implements CompensationStrategy {
private final PaymentService paymentService;
public PaymentCompensationStrategy(PaymentService paymentService) {
this.paymentService = paymentService;
}
@Override
public String getStepName() {
return "paymentProcessing";
}
@Override
public CompensationStepResult compensate(Map<String, Object> context) {
try {
String transactionId = (String) context.get("paymentTransactionId");
if (transactionId != null) {
log.info("Refunding payment transaction: {}", transactionId);
boolean refunded = paymentService.refundPayment(transactionId);
if (refunded) {
return CompensationStepResult.success("Payment refunded successfully");
} else {
return CompensationStepResult.failure("Payment refund failed");
}
}
return CompensationStepResult.success("No payment to refund");
} catch (Exception e) {
log.error("Payment compensation failed", e);
return CompensationStepResult.failure("Payment compensation error: " + e.getMessage());
}
}
}
Testing Long-Running Workflows
Example 7: Workflow Testing
@ExtendWith(SpringExtension.class)
@SpringBootTest
@TestPropertySource(properties = {
"spring.datasource.url=jdbc:h2:mem:testdb",
"spring.jpa.hibernate.ddl-auto=create-drop"
})
class WorkflowEngineTest {
@Autowired
private WorkflowEngine workflowEngine;
@Autowired
private WorkflowInstanceRepository instanceRepository;
@MockBean
private PaymentService paymentService;
@MockBean
private InventoryService inventoryService;
@Test
@DisplayName("Should complete workflow successfully")
void shouldCompleteWorkflowSuccessfully() {
// Given
OrderWorkflowContext context = new OrderWorkflowContext();
context.setOrderId("order-123");
context.setCustomerId("customer-456");
context.setAmount(new BigDecimal("99.99"));
when(paymentService.processPayment(any())).thenReturn(
new PaymentResponse(true, "txn-123", "credit_card", null));
when(inventoryService.reserveItems(any())).thenReturn(true);
// When
WorkflowInstance instance = workflowEngine.startWorkflow("order-processing", context);
// Then - wait for completion
await().atMost(10, TimeUnit.SECONDS).until(() -> {
WorkflowInstance updated = instanceRepository.findById(instance.getId()).orElseThrow();
return updated.getCurrentState() == WorkflowState.COMPLETED;
});
WorkflowInstance completed = instanceRepository.findById(instance.getId()).orElseThrow();
assertThat(completed.getCurrentState()).isEqualTo(WorkflowState.COMPLETED);
}
@Test
@DisplayName("Should handle payment failure with compensation")
void shouldHandlePaymentFailure() {
// Given
OrderWorkflowContext context = new OrderWorkflowContext();
context.setOrderId("order-456");
context.setCustomerId("customer-789");
context.setAmount(new BigDecimal("149.99"));
when(paymentService.processPayment(any())).thenReturn(
new PaymentResponse(false, null, null, "Insufficient funds"));
// When
WorkflowInstance instance = workflowEngine.startWorkflow("order-processing", context);
// Then - wait for failure and compensation
await().atMost(10, TimeUnit.SECONDS).until(() -> {
WorkflowInstance updated = instanceRepository.findById(instance.getId()).orElseThrow();
return updated.getCurrentState() == WorkflowState.FAILED;
});
WorkflowInstance failed = instanceRepository.findById(instance.getId()).orElseThrow();
assertThat(failed.getCurrentState()).isEqualTo(WorkflowState.FAILED);
assertThat(failed.getLastError()).contains("Payment failed");
}
}
Best Practices
1. Idempotency
@Component
@Slf4j
public class IdempotentWorkflowStep implements WorkflowStep {
private final WorkflowExecutionTracker executionTracker;
@Override
public WorkflowExecutionResult execute(WorkflowInstance instance, OrderWorkflowContext context) {
String executionKey = instance.getId() + ":" + getName();
// Check if already executed
if (executionTracker.isAlreadyExecuted(executionKey)) {
log.info("Step already executed: {}", executionKey);
return WorkflowExecutionResult.success("Step already executed");
}
// Execute step
WorkflowExecutionResult result = doExecute(instance, context);
if (result.isSuccess()) {
executionTracker.recordExecution(executionKey);
}
return result;
}
private WorkflowExecutionResult doExecute(WorkflowInstance instance, OrderWorkflowContext context) {
// Actual step implementation
return WorkflowExecutionResult.success("Step executed");
}
}
2. Configuration Externalization
# application.yml workflow: order-processing: steps: - name: VALIDATION retry: max-attempts: 3 backoff: exponential initial-interval: 5000 - name: PAYMENT retry: max-attempts: 5 backoff: fixed interval: 10000 - name: INVENTORY retry: max-attempts: 3 backoff: exponential initial-interval: 5000 compensation: order: - PAYMENT - INVENTORY
Conclusion
Long-running workflows in Java require careful consideration of:
Key Patterns:
- State Machines: For complex state transitions
- Saga Pattern: For distributed transactions
- Compensation: For rollback logic
- Persistence: For durability across restarts
Implementation Choices:
- Custom Engine: Full control but more complexity
- Temporal.io: Production-ready but external dependency
- Spring State Machine: Spring integration but limited features
- Database-driven: Simple but requires manual recovery
Critical Success Factors:
- Idempotency: Ensure steps can be safely retried
- Monitoring: Track workflow progress and performance
- Error Handling: Comprehensive failure and compensation strategies
- Testing: Thorough testing of all workflow paths
- Operational Support: Tools for manual intervention and debugging
Choose the approach that best fits your complexity requirements, team expertise, and operational capabilities.