Learn how to build robust, scalable workflow orchestration using Uber Cadence for complex business processes in Java.
Table of Contents
- Cadence Fundamentals
- Workflow Implementation
- Activity Patterns
- Error Handling & Retries
- Signals & Queries
- Event Sourcing
- Testing Strategies
- Production Deployment
Cadence Fundamentals
What is Cadence?
Cadence is a distributed, scalable, durable, and highly available orchestration engine that executes business logic expressed as workflows.
Key Concepts:
- Workflows: Orchestrate activities and manage state
- Activities: Execute business logic (can be non-deterministic)
- Signals: External events to workflows
- Queries: Inspect workflow state
- Deciders: Determine next steps based on events
Workflow Implementation
1. Core Dependencies & Configuration
<!-- pom.xml -->
<properties>
<cadence-client.version>3.7.5</cadence-client.version>
</properties>
<dependencies>
<!-- Cadence Client -->
<dependency>
<groupId>com.uber.cadence</groupId>
<artifactId>cadence-client</artifactId>
<version>${cadence-client.version}</version>
</dependency>
<!-- Spring Boot Starters -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Testing -->
<dependency>
<groupId>com.uber.cadence</groupId>
<artifactId>cadence-testing</artifactId>
<version>${cadence-client.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
2. Cadence Configuration
@Configuration
@EnableConfigurationProperties(CadenceProperties.class)
public class CadenceConfig {
private static final Logger logger = LoggerFactory.getLogger(CadenceConfig.class);
@Bean
public WorkflowServiceTls workflowServiceTls(CadenceProperties properties) {
if (!properties.getTls().isEnabled()) {
return null;
}
return new WorkflowServiceTls.Builder()
.setCert(properties.getTls().getCert())
.setKey(properties.getTls().getKey())
.setCa(properties.getTls().getCa())
.setEnableHostNameVerification(properties.getTls().isEnableHostNameVerification())
.build();
}
@Bean
public WorkflowClient workflowClient(CadenceProperties properties,
WorkflowServiceTls workflowServiceTls) {
WorkflowClientOptions.Builder clientOptionsBuilder = new WorkflowClientOptions.Builder()
.setDomain(properties.getDomain())
.setIdentity(properties.getIdentity());
if (workflowServiceTls != null) {
clientOptionsBuilder.setTls(workflowServiceTls);
}
WorkflowClient client = WorkflowClient.newInstance(
properties.getHost(),
properties.getPort(),
clientOptionsBuilder.build()
);
logger.info("Cadence WorkflowClient initialized for domain: {}", properties.getDomain());
return client;
}
@Bean
public WorkerFactory workerFactory(WorkflowClient workflowClient, CadenceProperties properties) {
WorkerFactory factory = WorkerFactory.newInstance(workflowClient);
factory.start();
logger.info("Cadence WorkerFactory started");
return factory;
}
@Bean
public WorkflowClientExternal workflowClientExternal(WorkflowClient workflowClient) {
return workflowClient.newWorkflowClientExternal();
}
}
@ConfigurationProperties(prefix = "cadence")
@Data
public class CadenceProperties {
private String host = "localhost";
private int port = 7933;
private String domain = "default";
private String identity = "java-worker";
private Tls tls = new Tls();
@Data
public static class Tls {
private boolean enabled = false;
private String cert;
private String key;
private String ca;
private boolean enableHostNameVerification = false;
}
}
3. E-Commerce Order Processing Workflow
// Workflow Interface
@WorkflowInterface
public interface OrderProcessingWorkflow {
@WorkflowMethod
OrderProcessingResult processOrder(OrderProcessingRequest request);
@SignalMethod
void updatePaymentStatus(PaymentStatusSignal signal);
@SignalMethod
void updateShippingStatus(ShippingStatusSignal signal);
@SignalMethod
void cancelOrder(CancelOrderSignal signal);
@QueryMethod
OrderProcessingState getOrderState();
}
// Workflow Implementation
public class OrderProcessingWorkflowImpl implements OrderProcessingWorkflow {
private static final Logger logger = LoggerFactory.getLogger(OrderProcessingWorkflowImpl.class);
private final OrderProcessingActivities activities =
Workflow.newActivityStub(OrderProcessingActivities.class);
private OrderProcessingState state = new OrderProcessingState();
private CompletablePromise<Void> paymentCompleted = Workflow.newPromise();
private CompletablePromise<Void> inventoryReserved = Workflow.newPromise();
private CompletablePromise<Void> shippingScheduled = Workflow.newPromise();
private boolean orderCancelled = false;
@Override
public OrderProcessingResult processOrder(OrderProcessingRequest request) {
logger.info("Starting order processing workflow for order: {}", request.getOrderId());
try {
// 1. Validate order
state.setStatus(OrderStatus.VALIDATING);
activities.validateOrder(request);
state.setStatus(OrderStatus.VALIDATED);
// 2. Process payment (wait for signal or process synchronously)
if (request.isPaymentPending()) {
state.setStatus(OrderStatus.AWAITING_PAYMENT);
// Wait for payment signal with timeout
Workflow.await(Duration.ofMinutes(30), () ->
paymentCompleted.isCompleted() || orderCancelled);
if (orderCancelled) {
throw new OrderCancelledException("Order was cancelled during payment");
}
if (!paymentCompleted.isCompleted()) {
throw new PaymentTimeoutException("Payment timeout after 30 minutes");
}
} else {
state.setStatus(OrderStatus.PROCESSING_PAYMENT);
activities.processPayment(request);
state.setStatus(OrderStatus.PAYMENT_COMPLETED);
}
// 3. Reserve inventory
state.setStatus(OrderStatus.RESERVING_INVENTORY);
activities.reserveInventory(request);
inventoryReserved.complete(null);
state.setStatus(OrderStatus.INVENTORY_RESERVED);
// 4. Schedule shipping
state.setStatus(OrderStatus.SCHEDULING_SHIPPING);
activities.scheduleShipping(request);
shippingScheduled.complete(null);
state.setStatus(OrderStatus.SHIPPING_SCHEDULED);
// 5. Finalize order
state.setStatus(OrderStatus.FINALIZING);
activities.finalizeOrder(request);
state.setStatus(OrderStatus.COMPLETED);
logger.info("Order processing completed successfully for order: {}", request.getOrderId());
return OrderProcessingResult.success(request.getOrderId());
} catch (OrderCancelledException e) {
logger.info("Order processing cancelled for order: {}", request.getOrderId());
handleOrderCancellation(request);
return OrderProcessingResult.cancelled(request.getOrderId(), e.getMessage());
} catch (Exception e) {
logger.error("Order processing failed for order: {}", request.getOrderId(), e);
handleOrderFailure(request, e);
return OrderProcessingResult.failed(request.getOrderId(), e.getMessage());
}
}
@Override
public void updatePaymentStatus(PaymentStatusSignal signal) {
logger.info("Received payment status update for order {}: {}",
state.getOrderId(), signal.getStatus());
state.setPaymentStatus(signal.getStatus());
if (signal.getStatus() == PaymentStatus.COMPLETED) {
paymentCompleted.complete(null);
state.setStatus(OrderStatus.PAYMENT_COMPLETED);
} else if (signal.getStatus() == PaymentStatus.FAILED) {
paymentCompleted.completeExceptionally(new PaymentFailedException("Payment failed"));
}
}
@Override
public void updateShippingStatus(ShippingStatusSignal signal) {
logger.info("Received shipping status update for order {}: {}",
state.getOrderId(), signal.getStatus());
state.setShippingStatus(signal.getStatus());
if (signal.getStatus() == ShippingStatus.DELIVERED) {
state.setStatus(OrderStatus.DELIVERED);
}
}
@Override
public void cancelOrder(CancelOrderSignal signal) {
logger.info("Received cancellation request for order: {}", state.getOrderId());
orderCancelled = true;
state.setStatus(OrderStatus.CANCELLED);
state.setCancellationReason(signal.getReason());
}
@Override
public OrderProcessingState getOrderState() {
return state;
}
private void handleOrderCancellation(OrderProcessingRequest request) {
try {
// Compensating actions for cancellation
if (state.getStatus().ordinal() >= OrderStatus.PAYMENT_COMPLETED.ordinal()) {
activities.refundPayment(request);
}
if (state.getStatus().ordinal() >= OrderStatus.INVENTORY_RESERVED.ordinal()) {
activities.releaseInventory(request);
}
if (state.getStatus().ordinal() >= OrderStatus.SHIPPING_SCHEDULED.ordinal()) {
activities.cancelShipping(request);
}
activities.notifyOrderCancelled(request, state.getCancellationReason());
} catch (Exception e) {
logger.error("Error during order cancellation compensation for order: {}",
request.getOrderId(), e);
}
}
private void handleOrderFailure(OrderProcessingRequest request, Exception failure) {
try {
// Compensating actions for failure
if (state.getStatus().ordinal() >= OrderStatus.PAYMENT_COMPLETED.ordinal()) {
activities.refundPayment(request);
}
if (state.getStatus().ordinal() >= OrderStatus.INVENTORY_RESERVED.ordinal()) {
activities.releaseInventory(request);
}
activities.notifyOrderFailed(request, failure.getMessage());
} catch (Exception e) {
logger.error("Error during order failure compensation for order: {}",
request.getOrderId(), e);
}
}
}
// Data Models
@Data
public class OrderProcessingRequest {
private String orderId;
private String customerId;
private List<OrderItem> items;
private PaymentInfo paymentInfo;
private ShippingInfo shippingInfo;
private boolean paymentPending = false;
}
@Data
public class OrderProcessingState {
private String orderId;
private OrderStatus status = OrderStatus.CREATED;
private PaymentStatus paymentStatus = PaymentStatus.PENDING;
private ShippingStatus shippingStatus = ShippingStatus.PENDING;
private String cancellationReason;
private Instant createdAt = Instant.now();
private Instant updatedAt = Instant.now();
}
@Data
public class OrderProcessingResult {
private String orderId;
private boolean success;
private String status;
private String errorMessage;
public static OrderProcessingResult success(String orderId) {
OrderProcessingResult result = new OrderProcessingResult();
result.setOrderId(orderId);
result.setSuccess(true);
result.setStatus("COMPLETED");
return result;
}
public static OrderProcessingResult failed(String orderId, String errorMessage) {
OrderProcessingResult result = new OrderProcessingResult();
result.setOrderId(orderId);
result.setSuccess(false);
result.setStatus("FAILED");
result.setErrorMessage(errorMessage);
return result;
}
public static OrderProcessingResult cancelled(String orderId, String reason) {
OrderProcessingResult result = new OrderProcessingResult();
result.setOrderId(orderId);
result.setSuccess(false);
result.setStatus("CANCELLED");
result.setErrorMessage(reason);
return result;
}
}
// Enums
public enum OrderStatus {
CREATED,
VALIDATING,
VALIDATED,
AWAITING_PAYMENT,
PROCESSING_PAYMENT,
PAYMENT_COMPLETED,
RESERVING_INVENTORY,
INVENTORY_RESERVED,
SCHEDULING_SHIPPING,
SHIPPING_SCHEDULED,
FINALIZING,
COMPLETED,
CANCELLED,
DELIVERED
}
public enum PaymentStatus {
PENDING,
PROCESSING,
COMPLETED,
FAILED,
REFUNDED
}
public enum ShippingStatus {
PENDING,
SCHEDULED,
IN_TRANSIT,
DELIVERED,
CANCELLED
}
Activity Patterns
4. Activity Implementations
// Activity Interface
@ActivityInterface
public interface OrderProcessingActivities {
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void validateOrder(OrderProcessingRequest request);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 600)
void processPayment(OrderProcessingRequest request);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void reserveInventory(OrderProcessingRequest request);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void releaseInventory(OrderProcessingRequest request);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void scheduleShipping(OrderProcessingRequest request);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void cancelShipping(OrderProcessingRequest request);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void finalizeOrder(OrderProcessingRequest request);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void refundPayment(OrderProcessingRequest request);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void notifyOrderCancelled(OrderProcessingRequest request, String reason);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void notifyOrderFailed(OrderProcessingRequest request, String error);
}
// Activity Implementation
@Component
@Slf4j
public class OrderProcessingActivitiesImpl implements OrderProcessingActivities {
private final OrderService orderService;
private final PaymentService paymentService;
private final InventoryService inventoryService;
private final ShippingService shippingService;
private final NotificationService notificationService;
public OrderProcessingActivitiesImpl(OrderService orderService,
PaymentService paymentService,
InventoryService inventoryService,
ShippingService shippingService,
NotificationService notificationService) {
this.orderService = orderService;
this.paymentService = paymentService;
this.inventoryService = inventoryService;
this.shippingService = shippingService;
this.notificationService = notificationService;
}
@Override
public void validateOrder(OrderProcessingRequest request) {
log.info("Validating order: {}", request.getOrderId());
try {
// Validate order items
for (OrderItem item : request.getItems()) {
if (item.getQuantity() <= 0) {
throw new ValidationException("Invalid quantity for product: " + item.getProductId());
}
}
// Validate customer
if (!orderService.isCustomerValid(request.getCustomerId())) {
throw new ValidationException("Invalid customer: " + request.getCustomerId());
}
// Validate shipping address
if (!shippingService.isAddressValid(request.getShippingInfo().getAddress())) {
throw new ValidationException("Invalid shipping address");
}
log.info("Order validation completed successfully: {}", request.getOrderId());
} catch (Exception e) {
log.error("Order validation failed: {}", request.getOrderId(), e);
throw e;
}
}
@Override
public void processPayment(OrderProcessingRequest request) {
log.info("Processing payment for order: {}", request.getOrderId());
try {
PaymentResult result = paymentService.processPayment(
request.getPaymentInfo(),
calculateTotalAmount(request.getItems())
);
if (!result.isSuccess()) {
throw new PaymentFailedException("Payment processing failed: " + result.getErrorMessage());
}
log.info("Payment processed successfully for order: {}", request.getOrderId());
} catch (Exception e) {
log.error("Payment processing failed for order: {}", request.getOrderId(), e);
throw e;
}
}
@Override
public void reserveInventory(OrderProcessingRequest request) {
log.info("Reserving inventory for order: {}", request.getOrderId());
try {
for (OrderItem item : request.getItems()) {
boolean reserved = inventoryService.reserveItem(
item.getProductId(),
item.getQuantity(),
request.getOrderId()
);
if (!reserved) {
throw new InventoryException("Insufficient inventory for product: " + item.getProductId());
}
}
log.info("Inventory reserved successfully for order: {}", request.getOrderId());
} catch (Exception e) {
log.error("Inventory reservation failed for order: {}", request.getOrderId(), e);
throw e;
}
}
@Override
public void releaseInventory(OrderProcessingRequest request) {
log.info("Releasing inventory for order: {}", request.getOrderId());
try {
for (OrderItem item : request.getItems()) {
inventoryService.releaseItem(
item.getProductId(),
item.getQuantity(),
request.getOrderId()
);
}
log.info("Inventory released successfully for order: {}", request.getOrderId());
} catch (Exception e) {
log.error("Inventory release failed for order: {}", request.getOrderId(), e);
// Don't throw exception in compensation activities to avoid masking original error
}
}
@Override
public void scheduleShipping(OrderProcessingRequest request) {
log.info("Scheduling shipping for order: {}", request.getOrderId());
try {
ShippingResult result = shippingService.scheduleShipping(
request.getOrderId(),
request.getShippingInfo(),
request.getItems()
);
if (!result.isSuccess()) {
throw new ShippingException("Shipping scheduling failed: " + result.getErrorMessage());
}
log.info("Shipping scheduled successfully for order: {}", request.getOrderId());
} catch (Exception e) {
log.error("Shipping scheduling failed for order: {}", request.getOrderId(), e);
throw e;
}
}
@Override
public void cancelShipping(OrderProcessingRequest request) {
log.info("Cancelling shipping for order: {}", request.getOrderId());
try {
shippingService.cancelShipping(request.getOrderId());
log.info("Shipping cancelled successfully for order: {}", request.getOrderId());
} catch (Exception e) {
log.error("Shipping cancellation failed for order: {}", request.getOrderId(), e);
// Don't throw exception in compensation activities
}
}
@Override
public void finalizeOrder(OrderProcessingRequest request) {
log.info("Finalizing order: {}", request.getOrderId());
try {
orderService.markOrderAsCompleted(request.getOrderId());
notificationService.sendOrderConfirmation(request.getCustomerId(), request.getOrderId());
log.info("Order finalized successfully: {}", request.getOrderId());
} catch (Exception e) {
log.error("Order finalization failed: {}", request.getOrderId(), e);
throw e;
}
}
@Override
public void refundPayment(OrderProcessingRequest request) {
log.info("Processing refund for order: {}", request.getOrderId());
try {
paymentService.processRefund(request.getOrderId());
log.info("Refund processed successfully for order: {}", request.getOrderId());
} catch (Exception e) {
log.error("Refund processing failed for order: {}", request.getOrderId(), e);
// Don't throw exception in compensation activities
}
}
@Override
public void notifyOrderCancelled(OrderProcessingRequest request, String reason) {
log.info("Notifying order cancellation for order: {}", request.getOrderId());
try {
notificationService.sendOrderCancellation(
request.getCustomerId(),
request.getOrderId(),
reason
);
} catch (Exception e) {
log.error("Order cancellation notification failed for order: {}", request.getOrderId(), e);
// Don't throw exception in notification activities
}
}
@Override
public void notifyOrderFailed(OrderProcessingRequest request, String error) {
log.info("Notifying order failure for order: {}", request.getOrderId());
try {
notificationService.sendOrderFailure(
request.getCustomerId(),
request.getOrderId(),
error
);
} catch (Exception e) {
log.error("Order failure notification failed for order: {}", request.getOrderId(), e);
// Don't throw exception in notification activities
}
}
private BigDecimal calculateTotalAmount(List<OrderItem> items) {
return items.stream()
.map(item -> item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
}
}
// Signal Classes
@Data
public class PaymentStatusSignal {
private String orderId;
private PaymentStatus status;
private String transactionId;
private String message;
}
@Data
public class ShippingStatusSignal {
private String orderId;
private ShippingStatus status;
private String trackingNumber;
private String message;
}
@Data
public class CancelOrderSignal {
private String orderId;
private String reason;
private String initiatedBy;
}
Error Handling & Retries
5. Advanced Retry and Error Handling
// Custom retry options
@Component
public class ActivityRetryOptions {
public RetryOptions getDefaultRetryOptions() {
return new RetryOptions.Builder()
.setInitialInterval(Duration.ofSeconds(1))
.setMaximumInterval(Duration.ofMinutes(1))
.setBackoffCoefficient(2.0)
.setMaximumAttempts(3)
.setDoNotRetry(ValidationException.class, PaymentFailedException.class)
.build();
}
public RetryOptions getPaymentRetryOptions() {
return new RetryOptions.Builder()
.setInitialInterval(Duration.ofSeconds(5))
.setMaximumInterval(Duration.ofMinutes(5))
.setBackoffCoefficient(1.5)
.setMaximumAttempts(5)
.setDoNotRetry(ValidationException.class)
.build();
}
public RetryOptions getInventoryRetryOptions() {
return new RetryOptions.Builder()
.setInitialInterval(Duration.ofSeconds(2))
.setMaximumInterval(Duration.ofMinutes(2))
.setBackoffCoefficient(2.0)
.setMaximumAttempts(10) // More retries for inventory
.build();
}
}
// Enhanced workflow with retry configuration
public class RobustOrderProcessingWorkflowImpl implements OrderProcessingWorkflow {
private final OrderProcessingActivities activities;
private OrderProcessingState state = new OrderProcessingState();
private boolean orderCancelled = false;
public RobustOrderProcessingWorkflowImpl() {
// Create activity stubs with retry options
ActivityOptions activityOptions = new ActivityOptions.Builder()
.setScheduleToCloseTimeout(Duration.ofHours(1))
.setRetryOptions(new ActivityRetryOptions().getDefaultRetryOptions())
.build();
this.activities = Workflow.newActivityStub(OrderProcessingActivities.class, activityOptions);
}
@Override
public OrderProcessingResult processOrder(OrderProcessingRequest request) {
logger.info("Starting robust order processing for order: {}", request.getOrderId());
state.setOrderId(request.getOrderId());
try {
// Execute with compensation pattern
return executeWithCompensation(request);
} catch (Exception e) {
logger.error("Workflow execution failed for order: {}", request.getOrderId(), e);
return handleWorkflowFailure(request, e);
}
}
private OrderProcessingResult executeWithCompensation(OrderProcessingRequest request) {
List<Runnable> compensationActions = new ArrayList<>();
try {
// 1. Validate order (no compensation needed)
state.setStatus(OrderStatus.VALIDATING);
activities.validateOrder(request);
state.setStatus(OrderStatus.VALIDATED);
// 2. Process payment (compensation: refund)
state.setStatus(OrderStatus.PROCESSING_PAYMENT);
activities.processPayment(request);
state.setStatus(OrderStatus.PAYMENT_COMPLETED);
compensationActions.add(() -> activities.refundPayment(request));
// 3. Reserve inventory (compensation: release)
state.setStatus(OrderStatus.RESERVING_INVENTORY);
activities.reserveInventory(request);
state.setStatus(OrderStatus.INVENTORY_RESERVED);
compensationActions.add(() -> activities.releaseInventory(request));
// 4. Schedule shipping (compensation: cancel)
state.setStatus(OrderStatus.SCHEDULING_SHIPPING);
activities.scheduleShipping(request);
state.setStatus(OrderStatus.SHIPPING_SCHEDULED);
compensationActions.add(() -> activities.cancelShipping(request));
// 5. Finalize order
state.setStatus(OrderStatus.FINALIZING);
activities.finalizeOrder(request);
state.setStatus(OrderStatus.COMPLETED);
return OrderProcessingResult.success(request.getOrderId());
} catch (Exception e) {
// Execute compensation actions in reverse order
executeCompensationActions(compensationActions);
throw e;
}
}
private void executeCompensationActions(List<Runnable> compensationActions) {
// Execute in reverse order (LIFO)
for (int i = compensationActions.size() - 1; i >= 0; i--) {
try {
compensationActions.get(i).run();
} catch (Exception e) {
logger.error("Compensation action failed", e);
// Continue with other compensations
}
}
}
private OrderProcessingResult handleWorkflowFailure(OrderProcessingRequest request, Exception failure) {
if (orderCancelled) {
activities.notifyOrderCancelled(request, state.getCancellationReason());
return OrderProcessingResult.cancelled(request.getOrderId(), state.getCancellationReason());
} else {
activities.notifyOrderFailed(request, failure.getMessage());
return OrderProcessingResult.failed(request.getOrderId(), failure.getMessage());
}
}
// Signal and query methods same as before
@Override
public void updatePaymentStatus(PaymentStatusSignal signal) {
state.setPaymentStatus(signal.getStatus());
}
@Override
public void updateShippingStatus(ShippingStatusSignal signal) {
state.setShippingStatus(signal.getStatus());
}
@Override
public void cancelOrder(CancelOrderSignal signal) {
orderCancelled = true;
state.setStatus(OrderStatus.CANCELLED);
state.setCancellationReason(signal.getReason());
}
@Override
public OrderProcessingState getOrderState() {
return state;
}
}
// Circuit breaker for activities
@Component
@Slf4j
public class ActivityCircuitBreaker {
private final Map<String, CircuitBreaker> breakers;
public ActivityCircuitBreaker() {
this.breakers = new ConcurrentHashMap<>();
}
public <T> T executeWithCircuitBreaker(String activityName, Supplier<T> supplier) {
CircuitBreaker breaker = breakers.computeIfAbsent(activityName, this::createCircuitBreaker);
return breaker.executeSupplier(supplier);
}
public void executeWithCircuitBreaker(String activityName, Runnable runnable) {
CircuitBreaker breaker = breakers.computeIfAbsent(activityName, this::createCircuitBreaker);
breaker.executeRunnable(runnable);
}
private CircuitBreaker createCircuitBreaker(String activityName) {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.slowCallRateThreshold(50)
.waitDurationInOpenState(Duration.ofMinutes(5))
.slowCallDurationThreshold(Duration.ofSeconds(30))
.permittedNumberOfCallsInHalfOpenState(5)
.minimumNumberOfCalls(10)
.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED)
.slidingWindowSize(20)
.build();
return CircuitBreaker.of(activityName, config);
}
}
// Enhanced activities with circuit breaker
@Component
@Slf4j
public class ResilientOrderProcessingActivitiesImpl implements OrderProcessingActivities {
private final OrderProcessingActivitiesImpl delegate;
private final ActivityCircuitBreaker circuitBreaker;
public ResilientOrderProcessingActivitiesImpl(OrderProcessingActivitiesImpl delegate,
ActivityCircuitBreaker circuitBreaker) {
this.delegate = delegate;
this.circuitBreaker = circuitBreaker;
}
@Override
public void validateOrder(OrderProcessingRequest request) {
circuitBreaker.executeWithCircuitBreaker("validateOrder",
() -> delegate.validateOrder(request));
}
@Override
public void processPayment(OrderProcessingRequest request) {
circuitBreaker.executeWithCircuitBreaker("processPayment",
() -> delegate.processPayment(request));
}
@Override
public void reserveInventory(OrderProcessingRequest request) {
circuitBreaker.executeWithCircuitBreaker("reserveInventory",
() -> delegate.reserveInventory(request));
}
// ... other methods with circuit breaker
}
Signals & Queries
6. Advanced Signal and Query Patterns
// Workflow with enhanced signals and queries
@WorkflowInterface
public interface EnhancedOrderProcessingWorkflow {
@WorkflowMethod
OrderProcessingResult processOrder(OrderProcessingRequest request);
// Payment signals
@SignalMethod
void updatePaymentStatus(PaymentStatusSignal signal);
@SignalMethod
void requestPaymentRetry(PaymentRetrySignal signal);
// Shipping signals
@SignalMethod
void updateShippingStatus(ShippingStatusSignal signal);
@SignalMethod
void updateTrackingInfo(TrackingInfoSignal signal);
// Order management signals
@SignalMethod
void cancelOrder(CancelOrderSignal signal);
@SignalMethod
void updateOrderItems(UpdateItemsSignal signal);
// Customer communication signals
@SignalMethod
void customerApproval(CustomerApprovalSignal signal);
// Queries
@QueryMethod
OrderProcessingState getOrderState();
@QueryMethod
OrderProcessingTimeline getOrderTimeline();
@QueryMethod
List<ProcessingStep> getPendingSteps();
@QueryMethod
String getCurrentStatus();
}
// Enhanced workflow implementation
public class EnhancedOrderProcessingWorkflowImpl implements EnhancedOrderProcessingWorkflow {
private final OrderProcessingActivities activities;
private OrderProcessingState state = new OrderProcessingState();
private OrderProcessingTimeline timeline = new OrderProcessingTimeline();
private CompletablePromise<Void> paymentCompleted = Workflow.newPromise();
private CompletablePromise<Void> customerApprovalReceived = Workflow.newPromise();
private CompletablePromise<Void> shippingScheduled = Workflow.newPromise();
private boolean orderCancelled = false;
private String cancellationReason;
public EnhancedOrderProcessingWorkflowImpl() {
ActivityOptions activityOptions = new ActivityOptions.Builder()
.setScheduleToCloseTimeout(Duration.ofHours(2))
.setRetryOptions(new ActivityRetryOptions().getDefaultRetryOptions())
.build();
this.activities = Workflow.newActivityStub(OrderProcessingActivities.class, activityOptions);
}
@Override
public OrderProcessingResult processOrder(OrderProcessingRequest request) {
logWorkflowEvent("Workflow started", request.getOrderId());
state.setOrderId(request.getOrderId());
timeline.setWorkflowStart(Instant.now());
try {
return executeOrderProcessing(request);
} catch (Exception e) {
return handleProcessingFailure(request, e);
}
}
private OrderProcessingResult executeOrderProcessing(OrderProcessingRequest request) {
// 1. Order validation
executeStep("VALIDATION", () -> activities.validateOrder(request));
// 2. Payment processing (with potential external approval)
if (requiresCustomerApproval(request)) {
executeStep("AWAITING_APPROVAL", () -> awaitCustomerApproval());
}
executeStep("PAYMENT_PROCESSING", () -> processPaymentWithRetries(request));
// 3. Inventory management
executeStep("INVENTORY_RESERVATION", () -> activities.reserveInventory(request));
// 4. Shipping coordination
executeStep("SHIPPING_SCHEDULING", () -> activities.scheduleShipping(request));
// 5. Order finalization
executeStep("ORDER_FINALIZATION", () -> activities.finalizeOrder(request));
logWorkflowEvent("Workflow completed successfully", request.getOrderId());
timeline.setWorkflowEnd(Instant.now());
return OrderProcessingResult.success(request.getOrderId());
}
private void executeStep(String stepName, Runnable stepAction) {
if (orderCancelled) {
throw new OrderCancelledException("Order cancelled during step: " + stepName);
}
logWorkflowEvent("Starting step: " + stepName, state.getOrderId());
timeline.addStep(stepName, Instant.now(), null);
try {
stepAction.run();
timeline.markStepCompleted(stepName, Instant.now());
logWorkflowEvent("Step completed: " + stepName, state.getOrderId());
} catch (Exception e) {
timeline.markStepFailed(stepName, Instant.now(), e.getMessage());
logWorkflowEvent("Step failed: " + stepName + " - " + e.getMessage(), state.getOrderId());
throw e;
}
}
private void processPaymentWithRetries(OrderProcessingRequest request) {
int attempt = 1;
int maxAttempts = 3;
while (attempt <= maxAttempts) {
try {
activities.processPayment(request);
return;
} catch (PaymentFailedException e) {
if (attempt == maxAttempts) {
throw e;
}
logWorkflowEvent("Payment attempt " + attempt + " failed, waiting for retry",
state.getOrderId());
// Wait before retry
Workflow.sleep(Duration.ofMinutes(5));
attempt++;
}
}
}
private void awaitCustomerApproval() {
logWorkflowEvent("Waiting for customer approval", state.getOrderId());
// Wait for approval with timeout
boolean approved = Workflow.await(Duration.ofHours(24), () ->
customerApprovalReceived.isCompleted() || orderCancelled);
if (!approved) {
throw new CustomerApprovalTimeoutException("Customer approval timeout after 24 hours");
}
}
// Signal handlers
@Override
public void updatePaymentStatus(PaymentStatusSignal signal) {
logWorkflowEvent("Payment status updated: " + signal.getStatus(), state.getOrderId());
state.setPaymentStatus(signal.getStatus());
if (signal.getStatus() == PaymentStatus.COMPLETED) {
paymentCompleted.complete(null);
}
}
@Override
public void requestPaymentRetry(PaymentRetrySignal signal) {
logWorkflowEvent("Payment retry requested", state.getOrderId());
// Implementation for manual payment retry
}
@Override
public void updateShippingStatus(ShippingStatusSignal signal) {
logWorkflowEvent("Shipping status updated: " + signal.getStatus(), state.getOrderId());
state.setShippingStatus(signal.getStatus());
}
@Override
public void updateTrackingInfo(TrackingInfoSignal signal) {
logWorkflowEvent("Tracking info updated: " + signal.getTrackingNumber(), state.getOrderId());
state.setTrackingNumber(signal.getTrackingNumber());
}
@Override
public void cancelOrder(CancelOrderSignal signal) {
logWorkflowEvent("Order cancellation requested: " + signal.getReason(), state.getOrderId());
orderCancelled = true;
cancellationReason = signal.getReason();
state.setStatus(OrderStatus.CANCELLED);
}
@Override
public void updateOrderItems(UpdateItemsSignal signal) {
logWorkflowEvent("Order items updated", state.getOrderId());
// Implementation for updating order items
}
@Override
public void customerApproval(CustomerApprovalSignal signal) {
logWorkflowEvent("Customer approval received: " + signal.isApproved(), state.getOrderId());
if (signal.isApproved()) {
customerApprovalReceived.complete(null);
} else {
customerApprovalReceived.completeExceptionally(
new CustomerApprovalDeniedException("Customer denied approval")
);
}
}
// Query handlers
@Override
public OrderProcessingState getOrderState() {
return state;
}
@Override
public OrderProcessingTimeline getOrderTimeline() {
return timeline;
}
@Override
public List<ProcessingStep> getPendingSteps() {
return timeline.getPendingSteps();
}
@Override
public String getCurrentStatus() {
return state.getStatus().toString();
}
private void logWorkflowEvent(String message, String orderId) {
logger.info("[Workflow {}] {}", orderId, message);
timeline.addEvent(message, Instant.now());
}
private boolean requiresCustomerApproval(OrderProcessingRequest request) {
// Logic to determine if order requires customer approval
return calculateTotalAmount(request.getItems()).compareTo(new BigDecimal("1000")) > 0;
}
private OrderProcessingResult handleProcessingFailure(OrderProcessingRequest request, Exception failure) {
logWorkflowEvent("Workflow failed: " + failure.getMessage(), request.getOrderId());
timeline.setWorkflowEnd(Instant.now());
if (orderCancelled) {
activities.notifyOrderCancelled(request, cancellationReason);
return OrderProcessingResult.cancelled(request.getOrderId(), cancellationReason);
} else {
activities.notifyOrderFailed(request, failure.getMessage());
return OrderProcessingResult.failed(request.getOrderId(), failure.getMessage());
}
}
}
// Enhanced data models
@Data
public class OrderProcessingTimeline {
private Instant workflowStart;
private Instant workflowEnd;
private List<ProcessingStep> steps = new ArrayList<>();
private List<TimelineEvent> events = new ArrayList<>();
public void addStep(String name, Instant startTime, Instant endTime) {
steps.add(new ProcessingStep(name, startTime, endTime, null));
}
public void markStepCompleted(String name, Instant endTime) {
steps.stream()
.filter(step -> step.getName().equals(name) && step.getEndTime() == null)
.findFirst()
.ifPresent(step -> step.setEndTime(endTime));
}
public void markStepFailed(String name, Instant endTime, String error) {
steps.stream()
.filter(step -> step.getName().equals(name) && step.getEndTime() == null)
.findFirst()
.ifPresent(step -> {
step.setEndTime(endTime);
step.setError(error);
});
}
public void addEvent(String description, Instant timestamp) {
events.add(new TimelineEvent(description, timestamp));
}
public List<ProcessingStep> getPendingSteps() {
return steps.stream()
.filter(step -> step.getEndTime() == null)
.collect(Collectors.toList());
}
}
@Data
public class ProcessingStep {
private String name;
private Instant startTime;
private Instant endTime;
private String error;
public ProcessingStep(String name, Instant startTime, Instant endTime, String error) {
this.name = name;
this.startTime = startTime;
this.endTime = endTime;
this.error = error;
}
public boolean isCompleted() {
return endTime != null && error == null;
}
public boolean isFailed() {
return endTime != null && error != null;
}
public boolean isInProgress() {
return endTime == null;
}
}
@Data
public class TimelineEvent {
private String description;
private Instant timestamp;
public TimelineEvent(String description, Instant timestamp) {
this.description = description;
this.timestamp = timestamp;
}
}
// Additional signal classes
@Data
public class PaymentRetrySignal {
private String orderId;
private String paymentMethod;
private String reason;
}
@Data
public class TrackingInfoSignal {
private String orderId;
private String trackingNumber;
private String carrier;
private String trackingUrl;
}
@Data
public class UpdateItemsSignal {
private String orderId;
private List<OrderItem> addedItems;
private List<OrderItem> removedItems;
private String reason;
}
@Data
public class CustomerApprovalSignal {
private String orderId;
private boolean approved;
private String comments;
private Instant approvedAt;
}
Event Sourcing
7. Event Sourcing with Cadence
// Event-sourced workflow
@WorkflowInterface
public interface EventSourcedOrderWorkflow {
@WorkflowMethod
OrderProcessingResult processOrder(OrderProcessingRequest request);
@SignalMethod
void applyEvent(OrderEvent event);
@QueryMethod
OrderProjection getOrderProjection();
@QueryMethod
List<OrderEvent> getEventHistory();
}
// Event-sourced workflow implementation
public class EventSourcedOrderWorkflowImpl implements EventSourcedOrderWorkflow {
private final List<OrderEvent> events = new ArrayList<>();
private OrderProjection projection = new OrderProjection();
private boolean workflowCompleted = false;
@Override
public OrderProcessingResult processOrder(OrderProcessingRequest request) {
// Start by applying the order created event
applyEvent(new OrderCreatedEvent(request));
try {
// Process the order using events
return processOrderWithEvents(request);
} catch (Exception e) {
applyEvent(new OrderFailedEvent(request.getOrderId(), e.getMessage()));
return OrderProcessingResult.failed(request.getOrderId(), e.getMessage());
}
}
@Override
public void applyEvent(OrderEvent event) {
events.add(event);
projection.apply(event);
logger.info("Applied event: {} for order: {}", event.getType(), event.getOrderId());
}
@Override
public OrderProjection getOrderProjection() {
return projection;
}
@Override
public List<OrderEvent> getEventHistory() {
return new ArrayList<>(events);
}
private OrderProcessingResult processOrderWithEvents(OrderProcessingRequest request) {
// Replay events to rebuild state
replayEvents();
// Check if workflow is already completed
if (workflowCompleted) {
return OrderProcessingResult.success(request.getOrderId());
}
// Process based on current state
if (projection.getStatus() == OrderStatus.CREATED) {
processNewOrder(request);
}
// Wait for completion
Workflow.await(Duration.ofHours(24), () -> workflowCompleted);
if (workflowCompleted) {
return OrderProcessingResult.success(request.getOrderId());
} else {
return OrderProcessingResult.failed(request.getOrderId(), "Workflow timeout");
}
}
private void replayEvents() {
OrderProjection newProjection = new OrderProjection();
for (OrderEvent event : events) {
newProjection.apply(event);
}
this.projection = newProjection;
this.workflowCompleted = projection.getStatus() == OrderStatus.COMPLETED;
}
private void processNewOrder(OrderProcessingRequest request) {
try {
// Validate order
applyEvent(new OrderValidatedEvent(request.getOrderId()));
// Process payment
applyEvent(new PaymentProcessedEvent(request.getOrderId(), "txn_123"));
// Reserve inventory
applyEvent(new InventoryReservedEvent(request.getOrderId()));
// Schedule shipping
applyEvent(new ShippingScheduledEvent(request.getOrderId(), "tracking_123"));
// Complete order
applyEvent(new OrderCompletedEvent(request.getOrderId()));
workflowCompleted = true;
} catch (Exception e) {
applyEvent(new OrderFailedEvent(request.getOrderId(), e.getMessage()));
throw e;
}
}
}
// Event classes
public abstract class OrderEvent {
private String eventId;
private String orderId;
private String type;
private Instant timestamp;
private int version;
public OrderEvent(String orderId, String type) {
this.eventId = UUID.randomUUID().toString();
this.orderId = orderId;
this.type = type;
this.timestamp = Instant.now();
this.version = 1;
}
// Getters and setters
public String getEventId() { return eventId; }
public String getOrderId() { return orderId; }
public String getType() { return type; }
public Instant getTimestamp() { return timestamp; }
public int getVersion() { return version; }
}
public class OrderCreatedEvent extends OrderEvent {
private OrderProcessingRequest orderRequest;
public OrderCreatedEvent(OrderProcessingRequest orderRequest) {
super(orderRequest.getOrderId(), "ORDER_CREATED");
this.orderRequest = orderRequest;
}
public OrderProcessingRequest getOrderRequest() { return orderRequest; }
}
public class OrderValidatedEvent extends OrderEvent {
public OrderValidatedEvent(String orderId) {
super(orderId, "ORDER_VALIDATED");
}
}
public class PaymentProcessedEvent extends OrderEvent {
private String transactionId;
private BigDecimal amount;
public PaymentProcessedEvent(String orderId, String transactionId) {
super(orderId, "PAYMENT_PROCESSED");
this.transactionId = transactionId;
}
public String getTransactionId() { return transactionId; }
public BigDecimal getAmount() { return amount; }
}
public class InventoryReservedEvent extends OrderEvent {
public InventoryReservedEvent(String orderId) {
super(orderId, "INVENTORY_RESERVED");
}
}
public class ShippingScheduledEvent extends OrderEvent {
private String trackingNumber;
public ShippingScheduledEvent(String orderId, String trackingNumber) {
super(orderId, "SHIPPING_SCHEDULED");
this.trackingNumber = trackingNumber;
}
public String getTrackingNumber() { return trackingNumber; }
}
public class OrderCompletedEvent extends OrderEvent {
public OrderCompletedEvent(String orderId) {
super(orderId, "ORDER_COMPLETED");
}
}
public class OrderFailedEvent extends OrderEvent {
private String errorMessage;
public OrderFailedEvent(String orderId, String errorMessage) {
super(orderId, "ORDER_FAILED");
this.errorMessage = errorMessage;
}
public String getErrorMessage() { return errorMessage; }
}
// Projection for event sourcing
@Data
public class OrderProjection {
private String orderId;
private OrderStatus status = OrderStatus.CREATED;
private List<OrderEvent> appliedEvents = new ArrayList<>();
private Instant createdAt;
private Instant updatedAt;
public void apply(OrderEvent event) {
appliedEvents.add(event);
this.updatedAt = event.getTimestamp();
switch (event.getType()) {
case "ORDER_CREATED":
this.orderId = event.getOrderId();
this.createdAt = event.getTimestamp();
this.status = OrderStatus.CREATED;
break;
case "ORDER_VALIDATED":
this.status = OrderStatus.VALIDATED;
break;
case "PAYMENT_PROCESSED":
this.status = OrderStatus.PAYMENT_COMPLETED;
break;
case "INVENTORY_RESERVED":
this.status = OrderStatus.INVENTORY_RESERVED;
break;
case "SHIPPING_SCHEDULED":
this.status = OrderStatus.SHIPPING_SCHEDULED;
break;
case "ORDER_COMPLETED":
this.status = OrderStatus.COMPLETED;
break;
case "ORDER_FAILED":
this.status = OrderStatus.FAILED;
break;
}
}
public boolean canAcceptPayment() {
return status == OrderStatus.VALIDATED || status == OrderStatus.CREATED;
}
public boolean canCancel() {
return status != OrderStatus.COMPLETED && status != OrderStatus.CANCELLED;
}
}
Testing Strategies
8. Comprehensive Testing
// Workflow test base class
@ExtendWith(MockitoExtension.class)
public class WorkflowTestBase {
protected TestWorkflowEnvironment testEnvironment;
protected Worker worker;
protected WorkflowClient workflowClient;
@BeforeEach
void setUp() {
testEnvironment = TestWorkflowEnvironment.newInstance();
workflowClient = testEnvironment.newWorkflowClient();
// Create worker
WorkerFactory factory = testEnvironment.newWorkerFactory();
worker = factory.newWorker("ORDER_TASK_LIST");
worker.registerWorkflowImplementationTypes(
OrderProcessingWorkflowImpl.class,
EnhancedOrderProcessingWorkflowImpl.class
);
factory.start();
}
@AfterEach
void tearDown() {
testEnvironment.close();
}
}
// Workflow unit tests
class OrderProcessingWorkflowTest extends WorkflowTestBase {
@Test
void testSuccessfulOrderProcessing() {
// Given
OrderProcessingWorkflow workflow = workflowClient.newWorkflowStub(
OrderProcessingWorkflow.class,
new WorkflowOptions.Builder()
.setTaskList("ORDER_TASK_LIST")
.setExecutionStartToCloseTimeout(Duration.ofHours(1))
.build()
);
OrderProcessingRequest request = createTestOrderRequest();
// When
WorkflowClient.execute(workflow::processOrder, request);
// Then - Workflow should complete successfully
// In real test, you would mock activities and verify calls
}
@Test
void testOrderCancellation() {
// Given
OrderProcessingWorkflow workflow = workflowClient.newWorkflowStub(
OrderProcessingWorkflow.class,
new WorkflowOptions.Builder()
.setTaskList("ORDER_TASK_LIST")
.setExecutionStartToCloseTimeout(Duration.ofHours(1))
.build()
);
OrderProcessingRequest request = createTestOrderRequest();
// When
WorkflowExecution execution = WorkflowClient.start(workflow::processOrder, request);
// Send cancellation signal
workflow.cancelOrder(new CancelOrderSignal(request.getOrderId(), "Customer requested"));
// Then
OrderProcessingResult result = workflowClient.newWorkflowStub(
OrderProcessingWorkflow.class, execution.getWorkflowId()).getResult();
assertThat(result.isSuccess()).isFalse();
assertThat(result.getStatus()).isEqualTo("CANCELLED");
}
@Test
void testPaymentTimeout() {
// Given
OrderProcessingWorkflow workflow = workflowClient.newWorkflowStub(
OrderProcessingWorkflow.class,
new WorkflowOptions.Builder()
.setTaskList("ORDER_TASK_LIST")
.setExecutionStartToCloseTimeout(Duration.ofHours(1))
.build()
);
OrderProcessingRequest request = createTestOrderRequest();
request.setPaymentPending(true); // Requires external payment
// When & Then
assertThatThrownBy(() -> WorkflowClient.execute(workflow::processOrder, request))
.isInstanceOf(PaymentTimeoutException.class);
}
private OrderProcessingRequest createTestOrderRequest() {
OrderProcessingRequest request = new OrderProcessingRequest();
request.setOrderId("test-order-123");
request.setCustomerId("customer-456");
request.setItems(List.of(
new OrderItem("product-1", 2, new BigDecimal("29.99")),
new OrderItem("product-2", 1, new BigDecimal("49.99"))
));
request.setPaymentInfo(new PaymentInfo());
request.setShippingInfo(new ShippingInfo());
return request;
}
}
// Activity unit tests
@ExtendWith(MockitoExtension.class)
class OrderProcessingActivitiesTest {
@Mock
private OrderService orderService;
@Mock
private PaymentService paymentService;
@Mock
private InventoryService inventoryService;
@Mock
private ShippingService shippingService;
@Mock
private NotificationService notificationService;
private OrderProcessingActivities activities;
@BeforeEach
void setUp() {
activities = new OrderProcessingActivitiesImpl(
orderService, paymentService, inventoryService,
shippingService, notificationService
);
}
@Test
void testValidateOrder_Success() {
// Given
OrderProcessingRequest request = createTestOrderRequest();
when(orderService.isCustomerValid(anyString())).thenReturn(true);
// When & Then
assertThatNoException().isThrownBy(() -> activities.validateOrder(request));
}
@Test
void testValidateOrder_InvalidCustomer() {
// Given
OrderProcessingRequest request = createTestOrderRequest();
when(orderService.isCustomerValid(anyString())).thenReturn(false);
// When & Then
assertThatThrownBy(() -> activities.validateOrder(request))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Invalid customer");
}
@Test
void testProcessPayment_Success() {
// Given
OrderProcessingRequest request = createTestOrderRequest();
PaymentResult paymentResult = new PaymentResult(true, "txn_123", null);
when(paymentService.processPayment(any(), any())).thenReturn(paymentResult);
// When & Then
assertThatNoException().isThrownBy(() -> activities.processPayment(request));
}
@Test
void testProcessPayment_Failure() {
// Given
OrderProcessingRequest request = createTestOrderRequest();
PaymentResult paymentResult = new PaymentResult(false, null, "Insufficient funds");
when(paymentService.processPayment(any(), any())).thenReturn(paymentResult);
// When & Then
assertThatThrownBy(() -> activities.processPayment(request))
.isInstanceOf(PaymentFailedException.class)
.hasMessageContaining("Payment processing failed");
}
private OrderProcessingRequest createTestOrderRequest() {
OrderProcessingRequest request = new OrderProcessingRequest();
request.setOrderId("test-order-123");
request.setCustomerId("customer-456");
request.setItems(List.of(new OrderItem("product-1", 1, new BigDecimal("29.99"))));
request.setPaymentInfo(new PaymentInfo());
ShippingInfo shippingInfo = new ShippingInfo();
shippingInfo.setAddress(new Address("123 Main St", "City", "State", "12345", "US"));
request.setShippingInfo(shippingInfo);
return request;
}
}
// Integration test
@SpringBootTest
@TestPropertySource(properties = {
"cadence.host=localhost",
"cadence.port=7933",
"cadence.domain=test-domain"
})
class OrderProcessingIntegrationTest {
@Autowired
private WorkflowClient workflowClient;
@Autowired
private OrderProcessingWorkflowClient orderWorkflowClient;
@Test
void testEndToEndOrderProcessing() {
// Given
OrderProcessingRequest request = createTestOrderRequest();
// When
OrderProcessingResult result = orderWorkflowClient.startOrderProcessing(request);
// Then
assertThat(result).isNotNull();
assertThat(result.isSuccess()).isTrue();
assertThat(result.getOrderId()).isEqualTo(request.getOrderId());
}
private OrderProcessingRequest createTestOrderRequest() {
OrderProcessingRequest request = new OrderProcessingRequest();
request.setOrderId("integration-test-order");
request.setCustomerId("test-customer");
request.setItems(List.of(
new OrderItem("test-product-1", 1, new BigDecimal("19.99"))
));
request.setPaymentInfo(createTestPaymentInfo());
request.setShippingInfo(createTestShippingInfo());
return request;
}
private PaymentInfo createTestPaymentInfo() {
PaymentInfo paymentInfo = new PaymentInfo();
paymentInfo.setMethod("CREDIT_CARD");
paymentInfo.setCardNumber("4111111111111111");
paymentInfo.setExpiryDate("12/25");
paymentInfo.setCvv("123");
return paymentInfo;
}
private ShippingInfo createTestShippingInfo() {
ShippingInfo shippingInfo = new ShippingInfo();
shippingInfo.setAddress(new Address(
"123 Test St", "Test City", "TS", "12345", "US"
));
shippingInfo.setMethod("STANDARD");
return shippingInfo;
}
}
Production Deployment
9. Spring Boot Integration & Production Setup
# application.yml
spring:
application:
name: order-service
datasource:
url: jdbc:postgresql://localhost:5432/orders
username: order_user
password: order_pass
hikari:
maximum-pool-size: 20
cadence:
host: ${CADENCE_HOST:localhost}
port: ${CADENCE_PORT:7933}
domain: ${CADENCE_DOMAIN:production}
identity: ${HOSTNAME:order-service}
tls:
enabled: ${CADENCE_TLS_ENABLED:false}
cert: ${CADENCE_TLS_CERT:}
key: ${CADENCE_TLS_KEY:}
ca: ${CADENCE_TLS_CA:}
management:
endpoints:
web:
exposure:
include: health,metrics,info,prometheus
endpoint:
health:
show-details: always
metrics:
export:
prometheus:
enabled: true
logging:
level:
com.uber.cadence: INFO
com.example.workflow: DEBUG
// Worker configuration and startup
@Component
@Slf4j
public class CadenceWorkerManager {
private final WorkerFactory workerFactory;
private final List<Worker> workers = new ArrayList<>();
public CadenceWorkerManager(WorkerFactory workerFactory,
OrderProcessingActivities orderActivities,
PaymentProcessingActivities paymentActivities) {
this.workerFactory = workerFactory;
initializeWorkers(orderActivities, paymentActivities);
}
@PostConstruct
public void startWorkers() {
workerFactory.start();
log.info("Cadence workers started successfully");
}
@PreDestroy
public void stopWorkers() {
workerFactory.shutdown();
log.info("Cadence workers stopped");
}
private void initializeWorkers(OrderProcessingActivities orderActivities,
PaymentProcessingActivities paymentActivities) {
// Order processing worker
Worker orderWorker = workerFactory.newWorker("ORDER_TASK_LIST");
orderWorker.registerWorkflowImplementationTypes(
OrderProcessingWorkflowImpl.class,
EnhancedOrderProcessingWorkflowImpl.class,
EventSourcedOrderWorkflowImpl.class
);
orderWorker.registerActivitiesImplementations(orderActivities);
workers.add(orderWorker);
// Payment processing worker
Worker paymentWorker = workerFactory.newWorker("PAYMENT_TASK_LIST");
paymentWorker.registerWorkflowImplementationTypes(PaymentProcessingWorkflowImpl.class);
paymentWorker.registerActivitiesImplementations(paymentActivities);
workers.add(paymentWorker);
log.info("Initialized {} Cadence workers", workers.size());
}
}
// Workflow client service
@Service
@Slf4j
public class OrderProcessingWorkflowClient {
private final WorkflowClient workflowClient;
public OrderProcessingWorkflowClient(WorkflowClient workflowClient) {
this.workflowClient = workflowClient;
}
public OrderProcessingResult startOrderProcessing(OrderProcessingRequest request) {
try {
OrderProcessingWorkflow workflow = workflowClient.newWorkflowStub(
OrderProcessingWorkflow.class,
new WorkflowOptions.Builder()
.setTaskList("ORDER_TASK_LIST")
.setExecutionStartToCloseTimeout(Duration.ofHours(24))
.setWorkflowId("order-" + request.getOrderId())
.build()
);
return workflow.processOrder(request);
} catch (Exception e) {
log.error("Failed to start order processing workflow for order: {}", request.getOrderId(), e);
throw new WorkflowException("Failed to start order processing", e);
}
}
public void signalPaymentStatus(String orderId, PaymentStatus status) {
try {
OrderProcessingWorkflow workflow = workflowClient.newWorkflowStub(
OrderProcessingWorkflow.class, "order-" + orderId);
PaymentStatusSignal signal = new PaymentStatusSignal();
signal.setOrderId(orderId);
signal.setStatus(status);
workflow.updatePaymentStatus(signal);
} catch (Exception e) {
log.error("Failed to send payment status signal for order: {}", orderId, e);
throw new WorkflowException("Failed to send payment signal", e);
}
}
public void cancelOrder(String orderId, String reason) {
try {
OrderProcessingWorkflow workflow = workflowClient.newWorkflowStub(
OrderProcessingWorkflow.class, "order-" + orderId);
CancelOrderSignal signal = new CancelOrderSignal();
signal.setOrderId(orderId);
signal.setReason(reason);
workflow.cancelOrder(signal);
} catch (Exception e) {
log.error("Failed to cancel order: {}", orderId, e);
throw new WorkflowException("Failed to cancel order", e);
}
}
public OrderProcessingState queryOrderState(String orderId) {
try {
OrderProcessingWorkflow workflow = workflowClient.newWorkflowStub(
OrderProcessingWorkflow.class, "order-" + orderId);
return workflow.getOrderState();
} catch (Exception e) {
log.error("Failed to query order state for order: {}", orderId, e);
throw new WorkflowException("Failed to query order state", e);
}
}
}
// REST controller for workflow management
@RestController
@RequestMapping("/api/orders")
@Slf4j
public class OrderController {
private final OrderProcessingWorkflowClient workflowClient;
private final OrderService orderService;
public OrderController(OrderProcessingWorkflowClient workflowClient, OrderService orderService) {
this.workflowClient = workflowClient;
this.orderService = orderService;
}
@PostMapping
public ResponseEntity<OrderResponse> createOrder(@RequestBody CreateOrderRequest request) {
try {
OrderProcessingRequest workflowRequest = convertToWorkflowRequest(request);
OrderProcessingResult result = workflowClient.startOrderProcessing(workflowRequest);
OrderResponse response = new OrderResponse();
response.setOrderId(result.getOrderId());
response.setStatus(result.getStatus());
response.setSuccess(result.isSuccess());
if (!result.isSuccess()) {
response.setErrorMessage(result.getErrorMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
return ResponseEntity.status(HttpStatus.ACCEPTED).body(response);
} catch (Exception e) {
log.error("Failed to create order", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
@PostMapping("/{orderId}/cancel")
public ResponseEntity<Void> cancelOrder(@PathVariable String orderId,
@RequestBody CancelOrderRequest request) {
try {
workflowClient.cancelOrder(orderId, request.getReason());
return ResponseEntity.accepted().build();
} catch (Exception e) {
log.error("Failed to cancel order: {}", orderId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
@GetMapping("/{orderId}/status")
public ResponseEntity<OrderStatusResponse> getOrderStatus(@PathVariable String orderId) {
try {
OrderProcessingState state = workflowClient.queryOrderState(orderId);
OrderStatusResponse response = new OrderStatusResponse();
response.setOrderId(orderId);
response.setStatus(state.getStatus().toString());
response.setPaymentStatus(state.getPaymentStatus().toString());
response.setShippingStatus(state.getShippingStatus().toString());
response.setLastUpdated(state.getUpdatedAt());
return ResponseEntity.ok(response);
} catch (Exception e) {
log.error("Failed to get order status: {}", orderId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
@PostMapping("/{orderId}/payment-status")
public ResponseEntity<Void> updatePaymentStatus(@PathVariable String orderId,
@RequestBody PaymentStatusRequest request) {
try {
workflowClient.signalPaymentStatus(orderId, request.getStatus());
return ResponseEntity.accepted().build();
} catch (Exception e) {
log.error("Failed to update payment status for order: {}", orderId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
private OrderProcessingRequest convertToWorkflowRequest(CreateOrderRequest request) {
OrderProcessingRequest workflowRequest = new OrderProcessingRequest();
workflowRequest.setOrderId(generateOrderId());
workflowRequest.setCustomerId(request.getCustomerId());
workflowRequest.setItems(request.getItems());
workflowRequest.setPaymentInfo(request.getPaymentInfo());
workflowRequest.setShippingInfo(request.getShippingInfo());
workflowRequest.setPaymentPending(request.isPaymentPending());
return workflowRequest;
}
private String generateOrderId() {
return "ORD-" + System.currentTimeMillis() + "-" + UUID.randomUUID().toString().substring(0, 8);
}
}
// Health check for Cadence
@Component
public class CadenceHealthIndicator implements HealthIndicator {
private final WorkflowClient workflowClient;
public CadenceHealthIndicator(WorkflowClient workflowClient) {
this.workflowClient = workflowClient;
}
@Override
public Health health() {
try {
// Try to list domains as a health check
workflowClient.getDomain("default");
return Health.up().withDetail("service", "cadence").build();
} catch (Exception e) {
return Health.down(e).withDetail("service", "cadence").build();
}
}
}
// Main application
@SpringBootApplication
@Slf4j
public class OrderServiceApplication {
public static void main(String[] args) {
SpringApplication.run(OrderServiceApplication.class, args);
}
@Bean
public CommandLineRunner demo(OrderProcessingWorkflowClient workflowClient) {
return args -> {
log.info("Order Service Application started successfully");
// Demo workflow execution if needed
if (args.length > 0 && "demo".equals(args[0])) {
executeDemoWorkflow(workflowClient);
}
};
}
private void executeDemoWorkflow(OrderProcessingWorkflowClient workflowClient) {
try {
OrderProcessingRequest request = createDemoOrder();
OrderProcessingResult result = workflowClient.startOrderProcessing(request);
log.info("Demo workflow completed: {}", result);
} catch (Exception e) {
log.error("Demo workflow failed", e);
}
}
private OrderProcessingRequest createDemoOrder() {
OrderProcessingRequest request = new OrderProcessingRequest();
request.setOrderId("demo-order-123");
request.setCustomerId("demo-customer");
request.setItems(List.of(
new OrderItem("demo-product", 1, new BigDecimal("99.99"))
));
request.setPaymentInfo(new PaymentInfo());
request.setShippingInfo(new ShippingInfo());
return request;
}
}
This comprehensive Cadence workflow implementation provides:
- Robust order processing with compensation patterns
- Advanced error handling with retries and circuit breakers
- Event sourcing for auditability and state recovery
- Comprehensive testing strategies
- Production-ready configuration with Spring Boot integration
- REST API for workflow management
- Monitoring and health checks
The implementation demonstrates how to build scalable, reliable workflow orchestration for complex business processes using Uber Cadence in Java.