Cadence Workflow in Java: Complete Implementation Guide

Learn how to build robust, scalable workflow orchestration using Uber Cadence for complex business processes in Java.

Table of Contents

  1. Cadence Fundamentals
  2. Workflow Implementation
  3. Activity Patterns
  4. Error Handling & Retries
  5. Signals & Queries
  6. Event Sourcing
  7. Testing Strategies
  8. Production Deployment

Cadence Fundamentals

What is Cadence?

Cadence is a distributed, scalable, durable, and highly available orchestration engine that executes business logic expressed as workflows.

Key Concepts:

  • Workflows: Orchestrate activities and manage state
  • Activities: Execute business logic (can be non-deterministic)
  • Signals: External events to workflows
  • Queries: Inspect workflow state
  • Deciders: Determine next steps based on events

Workflow Implementation

1. Core Dependencies & Configuration

<!-- pom.xml -->
<properties>
<cadence-client.version>3.7.5</cadence-client.version>
</properties>
<dependencies>
<!-- Cadence Client -->
<dependency>
<groupId>com.uber.cadence</groupId>
<artifactId>cadence-client</artifactId>
<version>${cadence-client.version}</version>
</dependency>
<!-- Spring Boot Starters -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Testing -->
<dependency>
<groupId>com.uber.cadence</groupId>
<artifactId>cadence-testing</artifactId>
<version>${cadence-client.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

2. Cadence Configuration

@Configuration
@EnableConfigurationProperties(CadenceProperties.class)
public class CadenceConfig {
private static final Logger logger = LoggerFactory.getLogger(CadenceConfig.class);
@Bean
public WorkflowServiceTls workflowServiceTls(CadenceProperties properties) {
if (!properties.getTls().isEnabled()) {
return null;
}
return new WorkflowServiceTls.Builder()
.setCert(properties.getTls().getCert())
.setKey(properties.getTls().getKey())
.setCa(properties.getTls().getCa())
.setEnableHostNameVerification(properties.getTls().isEnableHostNameVerification())
.build();
}
@Bean
public WorkflowClient workflowClient(CadenceProperties properties, 
WorkflowServiceTls workflowServiceTls) {
WorkflowClientOptions.Builder clientOptionsBuilder = new WorkflowClientOptions.Builder()
.setDomain(properties.getDomain())
.setIdentity(properties.getIdentity());
if (workflowServiceTls != null) {
clientOptionsBuilder.setTls(workflowServiceTls);
}
WorkflowClient client = WorkflowClient.newInstance(
properties.getHost(),
properties.getPort(),
clientOptionsBuilder.build()
);
logger.info("Cadence WorkflowClient initialized for domain: {}", properties.getDomain());
return client;
}
@Bean
public WorkerFactory workerFactory(WorkflowClient workflowClient, CadenceProperties properties) {
WorkerFactory factory = WorkerFactory.newInstance(workflowClient);
factory.start();
logger.info("Cadence WorkerFactory started");
return factory;
}
@Bean
public WorkflowClientExternal workflowClientExternal(WorkflowClient workflowClient) {
return workflowClient.newWorkflowClientExternal();
}
}
@ConfigurationProperties(prefix = "cadence")
@Data
public class CadenceProperties {
private String host = "localhost";
private int port = 7933;
private String domain = "default";
private String identity = "java-worker";
private Tls tls = new Tls();
@Data
public static class Tls {
private boolean enabled = false;
private String cert;
private String key;
private String ca;
private boolean enableHostNameVerification = false;
}
}

3. E-Commerce Order Processing Workflow

// Workflow Interface
@WorkflowInterface
public interface OrderProcessingWorkflow {
@WorkflowMethod
OrderProcessingResult processOrder(OrderProcessingRequest request);
@SignalMethod
void updatePaymentStatus(PaymentStatusSignal signal);
@SignalMethod
void updateShippingStatus(ShippingStatusSignal signal);
@SignalMethod
void cancelOrder(CancelOrderSignal signal);
@QueryMethod
OrderProcessingState getOrderState();
}
// Workflow Implementation
public class OrderProcessingWorkflowImpl implements OrderProcessingWorkflow {
private static final Logger logger = LoggerFactory.getLogger(OrderProcessingWorkflowImpl.class);
private final OrderProcessingActivities activities = 
Workflow.newActivityStub(OrderProcessingActivities.class);
private OrderProcessingState state = new OrderProcessingState();
private CompletablePromise<Void> paymentCompleted = Workflow.newPromise();
private CompletablePromise<Void> inventoryReserved = Workflow.newPromise();
private CompletablePromise<Void> shippingScheduled = Workflow.newPromise();
private boolean orderCancelled = false;
@Override
public OrderProcessingResult processOrder(OrderProcessingRequest request) {
logger.info("Starting order processing workflow for order: {}", request.getOrderId());
try {
// 1. Validate order
state.setStatus(OrderStatus.VALIDATING);
activities.validateOrder(request);
state.setStatus(OrderStatus.VALIDATED);
// 2. Process payment (wait for signal or process synchronously)
if (request.isPaymentPending()) {
state.setStatus(OrderStatus.AWAITING_PAYMENT);
// Wait for payment signal with timeout
Workflow.await(Duration.ofMinutes(30), () -> 
paymentCompleted.isCompleted() || orderCancelled);
if (orderCancelled) {
throw new OrderCancelledException("Order was cancelled during payment");
}
if (!paymentCompleted.isCompleted()) {
throw new PaymentTimeoutException("Payment timeout after 30 minutes");
}
} else {
state.setStatus(OrderStatus.PROCESSING_PAYMENT);
activities.processPayment(request);
state.setStatus(OrderStatus.PAYMENT_COMPLETED);
}
// 3. Reserve inventory
state.setStatus(OrderStatus.RESERVING_INVENTORY);
activities.reserveInventory(request);
inventoryReserved.complete(null);
state.setStatus(OrderStatus.INVENTORY_RESERVED);
// 4. Schedule shipping
state.setStatus(OrderStatus.SCHEDULING_SHIPPING);
activities.scheduleShipping(request);
shippingScheduled.complete(null);
state.setStatus(OrderStatus.SHIPPING_SCHEDULED);
// 5. Finalize order
state.setStatus(OrderStatus.FINALIZING);
activities.finalizeOrder(request);
state.setStatus(OrderStatus.COMPLETED);
logger.info("Order processing completed successfully for order: {}", request.getOrderId());
return OrderProcessingResult.success(request.getOrderId());
} catch (OrderCancelledException e) {
logger.info("Order processing cancelled for order: {}", request.getOrderId());
handleOrderCancellation(request);
return OrderProcessingResult.cancelled(request.getOrderId(), e.getMessage());
} catch (Exception e) {
logger.error("Order processing failed for order: {}", request.getOrderId(), e);
handleOrderFailure(request, e);
return OrderProcessingResult.failed(request.getOrderId(), e.getMessage());
}
}
@Override
public void updatePaymentStatus(PaymentStatusSignal signal) {
logger.info("Received payment status update for order {}: {}", 
state.getOrderId(), signal.getStatus());
state.setPaymentStatus(signal.getStatus());
if (signal.getStatus() == PaymentStatus.COMPLETED) {
paymentCompleted.complete(null);
state.setStatus(OrderStatus.PAYMENT_COMPLETED);
} else if (signal.getStatus() == PaymentStatus.FAILED) {
paymentCompleted.completeExceptionally(new PaymentFailedException("Payment failed"));
}
}
@Override
public void updateShippingStatus(ShippingStatusSignal signal) {
logger.info("Received shipping status update for order {}: {}", 
state.getOrderId(), signal.getStatus());
state.setShippingStatus(signal.getStatus());
if (signal.getStatus() == ShippingStatus.DELIVERED) {
state.setStatus(OrderStatus.DELIVERED);
}
}
@Override
public void cancelOrder(CancelOrderSignal signal) {
logger.info("Received cancellation request for order: {}", state.getOrderId());
orderCancelled = true;
state.setStatus(OrderStatus.CANCELLED);
state.setCancellationReason(signal.getReason());
}
@Override
public OrderProcessingState getOrderState() {
return state;
}
private void handleOrderCancellation(OrderProcessingRequest request) {
try {
// Compensating actions for cancellation
if (state.getStatus().ordinal() >= OrderStatus.PAYMENT_COMPLETED.ordinal()) {
activities.refundPayment(request);
}
if (state.getStatus().ordinal() >= OrderStatus.INVENTORY_RESERVED.ordinal()) {
activities.releaseInventory(request);
}
if (state.getStatus().ordinal() >= OrderStatus.SHIPPING_SCHEDULED.ordinal()) {
activities.cancelShipping(request);
}
activities.notifyOrderCancelled(request, state.getCancellationReason());
} catch (Exception e) {
logger.error("Error during order cancellation compensation for order: {}", 
request.getOrderId(), e);
}
}
private void handleOrderFailure(OrderProcessingRequest request, Exception failure) {
try {
// Compensating actions for failure
if (state.getStatus().ordinal() >= OrderStatus.PAYMENT_COMPLETED.ordinal()) {
activities.refundPayment(request);
}
if (state.getStatus().ordinal() >= OrderStatus.INVENTORY_RESERVED.ordinal()) {
activities.releaseInventory(request);
}
activities.notifyOrderFailed(request, failure.getMessage());
} catch (Exception e) {
logger.error("Error during order failure compensation for order: {}", 
request.getOrderId(), e);
}
}
}
// Data Models
@Data
public class OrderProcessingRequest {
private String orderId;
private String customerId;
private List<OrderItem> items;
private PaymentInfo paymentInfo;
private ShippingInfo shippingInfo;
private boolean paymentPending = false;
}
@Data
public class OrderProcessingState {
private String orderId;
private OrderStatus status = OrderStatus.CREATED;
private PaymentStatus paymentStatus = PaymentStatus.PENDING;
private ShippingStatus shippingStatus = ShippingStatus.PENDING;
private String cancellationReason;
private Instant createdAt = Instant.now();
private Instant updatedAt = Instant.now();
}
@Data
public class OrderProcessingResult {
private String orderId;
private boolean success;
private String status;
private String errorMessage;
public static OrderProcessingResult success(String orderId) {
OrderProcessingResult result = new OrderProcessingResult();
result.setOrderId(orderId);
result.setSuccess(true);
result.setStatus("COMPLETED");
return result;
}
public static OrderProcessingResult failed(String orderId, String errorMessage) {
OrderProcessingResult result = new OrderProcessingResult();
result.setOrderId(orderId);
result.setSuccess(false);
result.setStatus("FAILED");
result.setErrorMessage(errorMessage);
return result;
}
public static OrderProcessingResult cancelled(String orderId, String reason) {
OrderProcessingResult result = new OrderProcessingResult();
result.setOrderId(orderId);
result.setSuccess(false);
result.setStatus("CANCELLED");
result.setErrorMessage(reason);
return result;
}
}
// Enums
public enum OrderStatus {
CREATED,
VALIDATING,
VALIDATED,
AWAITING_PAYMENT,
PROCESSING_PAYMENT,
PAYMENT_COMPLETED,
RESERVING_INVENTORY,
INVENTORY_RESERVED,
SCHEDULING_SHIPPING,
SHIPPING_SCHEDULED,
FINALIZING,
COMPLETED,
CANCELLED,
DELIVERED
}
public enum PaymentStatus {
PENDING,
PROCESSING,
COMPLETED,
FAILED,
REFUNDED
}
public enum ShippingStatus {
PENDING,
SCHEDULED,
IN_TRANSIT,
DELIVERED,
CANCELLED
}

Activity Patterns

4. Activity Implementations

// Activity Interface
@ActivityInterface
public interface OrderProcessingActivities {
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void validateOrder(OrderProcessingRequest request);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 600)
void processPayment(OrderProcessingRequest request);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void reserveInventory(OrderProcessingRequest request);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void releaseInventory(OrderProcessingRequest request);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void scheduleShipping(OrderProcessingRequest request);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void cancelShipping(OrderProcessingRequest request);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void finalizeOrder(OrderProcessingRequest request);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void refundPayment(OrderProcessingRequest request);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void notifyOrderCancelled(OrderProcessingRequest request, String reason);
@ActivityMethod(scheduleToCloseTimeoutSeconds = 300)
void notifyOrderFailed(OrderProcessingRequest request, String error);
}
// Activity Implementation
@Component
@Slf4j
public class OrderProcessingActivitiesImpl implements OrderProcessingActivities {
private final OrderService orderService;
private final PaymentService paymentService;
private final InventoryService inventoryService;
private final ShippingService shippingService;
private final NotificationService notificationService;
public OrderProcessingActivitiesImpl(OrderService orderService,
PaymentService paymentService,
InventoryService inventoryService,
ShippingService shippingService,
NotificationService notificationService) {
this.orderService = orderService;
this.paymentService = paymentService;
this.inventoryService = inventoryService;
this.shippingService = shippingService;
this.notificationService = notificationService;
}
@Override
public void validateOrder(OrderProcessingRequest request) {
log.info("Validating order: {}", request.getOrderId());
try {
// Validate order items
for (OrderItem item : request.getItems()) {
if (item.getQuantity() <= 0) {
throw new ValidationException("Invalid quantity for product: " + item.getProductId());
}
}
// Validate customer
if (!orderService.isCustomerValid(request.getCustomerId())) {
throw new ValidationException("Invalid customer: " + request.getCustomerId());
}
// Validate shipping address
if (!shippingService.isAddressValid(request.getShippingInfo().getAddress())) {
throw new ValidationException("Invalid shipping address");
}
log.info("Order validation completed successfully: {}", request.getOrderId());
} catch (Exception e) {
log.error("Order validation failed: {}", request.getOrderId(), e);
throw e;
}
}
@Override
public void processPayment(OrderProcessingRequest request) {
log.info("Processing payment for order: {}", request.getOrderId());
try {
PaymentResult result = paymentService.processPayment(
request.getPaymentInfo(),
calculateTotalAmount(request.getItems())
);
if (!result.isSuccess()) {
throw new PaymentFailedException("Payment processing failed: " + result.getErrorMessage());
}
log.info("Payment processed successfully for order: {}", request.getOrderId());
} catch (Exception e) {
log.error("Payment processing failed for order: {}", request.getOrderId(), e);
throw e;
}
}
@Override
public void reserveInventory(OrderProcessingRequest request) {
log.info("Reserving inventory for order: {}", request.getOrderId());
try {
for (OrderItem item : request.getItems()) {
boolean reserved = inventoryService.reserveItem(
item.getProductId(),
item.getQuantity(),
request.getOrderId()
);
if (!reserved) {
throw new InventoryException("Insufficient inventory for product: " + item.getProductId());
}
}
log.info("Inventory reserved successfully for order: {}", request.getOrderId());
} catch (Exception e) {
log.error("Inventory reservation failed for order: {}", request.getOrderId(), e);
throw e;
}
}
@Override
public void releaseInventory(OrderProcessingRequest request) {
log.info("Releasing inventory for order: {}", request.getOrderId());
try {
for (OrderItem item : request.getItems()) {
inventoryService.releaseItem(
item.getProductId(),
item.getQuantity(),
request.getOrderId()
);
}
log.info("Inventory released successfully for order: {}", request.getOrderId());
} catch (Exception e) {
log.error("Inventory release failed for order: {}", request.getOrderId(), e);
// Don't throw exception in compensation activities to avoid masking original error
}
}
@Override
public void scheduleShipping(OrderProcessingRequest request) {
log.info("Scheduling shipping for order: {}", request.getOrderId());
try {
ShippingResult result = shippingService.scheduleShipping(
request.getOrderId(),
request.getShippingInfo(),
request.getItems()
);
if (!result.isSuccess()) {
throw new ShippingException("Shipping scheduling failed: " + result.getErrorMessage());
}
log.info("Shipping scheduled successfully for order: {}", request.getOrderId());
} catch (Exception e) {
log.error("Shipping scheduling failed for order: {}", request.getOrderId(), e);
throw e;
}
}
@Override
public void cancelShipping(OrderProcessingRequest request) {
log.info("Cancelling shipping for order: {}", request.getOrderId());
try {
shippingService.cancelShipping(request.getOrderId());
log.info("Shipping cancelled successfully for order: {}", request.getOrderId());
} catch (Exception e) {
log.error("Shipping cancellation failed for order: {}", request.getOrderId(), e);
// Don't throw exception in compensation activities
}
}
@Override
public void finalizeOrder(OrderProcessingRequest request) {
log.info("Finalizing order: {}", request.getOrderId());
try {
orderService.markOrderAsCompleted(request.getOrderId());
notificationService.sendOrderConfirmation(request.getCustomerId(), request.getOrderId());
log.info("Order finalized successfully: {}", request.getOrderId());
} catch (Exception e) {
log.error("Order finalization failed: {}", request.getOrderId(), e);
throw e;
}
}
@Override
public void refundPayment(OrderProcessingRequest request) {
log.info("Processing refund for order: {}", request.getOrderId());
try {
paymentService.processRefund(request.getOrderId());
log.info("Refund processed successfully for order: {}", request.getOrderId());
} catch (Exception e) {
log.error("Refund processing failed for order: {}", request.getOrderId(), e);
// Don't throw exception in compensation activities
}
}
@Override
public void notifyOrderCancelled(OrderProcessingRequest request, String reason) {
log.info("Notifying order cancellation for order: {}", request.getOrderId());
try {
notificationService.sendOrderCancellation(
request.getCustomerId(),
request.getOrderId(),
reason
);
} catch (Exception e) {
log.error("Order cancellation notification failed for order: {}", request.getOrderId(), e);
// Don't throw exception in notification activities
}
}
@Override
public void notifyOrderFailed(OrderProcessingRequest request, String error) {
log.info("Notifying order failure for order: {}", request.getOrderId());
try {
notificationService.sendOrderFailure(
request.getCustomerId(),
request.getOrderId(),
error
);
} catch (Exception e) {
log.error("Order failure notification failed for order: {}", request.getOrderId(), e);
// Don't throw exception in notification activities
}
}
private BigDecimal calculateTotalAmount(List<OrderItem> items) {
return items.stream()
.map(item -> item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
}
}
// Signal Classes
@Data
public class PaymentStatusSignal {
private String orderId;
private PaymentStatus status;
private String transactionId;
private String message;
}
@Data
public class ShippingStatusSignal {
private String orderId;
private ShippingStatus status;
private String trackingNumber;
private String message;
}
@Data
public class CancelOrderSignal {
private String orderId;
private String reason;
private String initiatedBy;
}

Error Handling & Retries

5. Advanced Retry and Error Handling

// Custom retry options
@Component
public class ActivityRetryOptions {
public RetryOptions getDefaultRetryOptions() {
return new RetryOptions.Builder()
.setInitialInterval(Duration.ofSeconds(1))
.setMaximumInterval(Duration.ofMinutes(1))
.setBackoffCoefficient(2.0)
.setMaximumAttempts(3)
.setDoNotRetry(ValidationException.class, PaymentFailedException.class)
.build();
}
public RetryOptions getPaymentRetryOptions() {
return new RetryOptions.Builder()
.setInitialInterval(Duration.ofSeconds(5))
.setMaximumInterval(Duration.ofMinutes(5))
.setBackoffCoefficient(1.5)
.setMaximumAttempts(5)
.setDoNotRetry(ValidationException.class)
.build();
}
public RetryOptions getInventoryRetryOptions() {
return new RetryOptions.Builder()
.setInitialInterval(Duration.ofSeconds(2))
.setMaximumInterval(Duration.ofMinutes(2))
.setBackoffCoefficient(2.0)
.setMaximumAttempts(10) // More retries for inventory
.build();
}
}
// Enhanced workflow with retry configuration
public class RobustOrderProcessingWorkflowImpl implements OrderProcessingWorkflow {
private final OrderProcessingActivities activities;
private OrderProcessingState state = new OrderProcessingState();
private boolean orderCancelled = false;
public RobustOrderProcessingWorkflowImpl() {
// Create activity stubs with retry options
ActivityOptions activityOptions = new ActivityOptions.Builder()
.setScheduleToCloseTimeout(Duration.ofHours(1))
.setRetryOptions(new ActivityRetryOptions().getDefaultRetryOptions())
.build();
this.activities = Workflow.newActivityStub(OrderProcessingActivities.class, activityOptions);
}
@Override
public OrderProcessingResult processOrder(OrderProcessingRequest request) {
logger.info("Starting robust order processing for order: {}", request.getOrderId());
state.setOrderId(request.getOrderId());
try {
// Execute with compensation pattern
return executeWithCompensation(request);
} catch (Exception e) {
logger.error("Workflow execution failed for order: {}", request.getOrderId(), e);
return handleWorkflowFailure(request, e);
}
}
private OrderProcessingResult executeWithCompensation(OrderProcessingRequest request) {
List<Runnable> compensationActions = new ArrayList<>();
try {
// 1. Validate order (no compensation needed)
state.setStatus(OrderStatus.VALIDATING);
activities.validateOrder(request);
state.setStatus(OrderStatus.VALIDATED);
// 2. Process payment (compensation: refund)
state.setStatus(OrderStatus.PROCESSING_PAYMENT);
activities.processPayment(request);
state.setStatus(OrderStatus.PAYMENT_COMPLETED);
compensationActions.add(() -> activities.refundPayment(request));
// 3. Reserve inventory (compensation: release)
state.setStatus(OrderStatus.RESERVING_INVENTORY);
activities.reserveInventory(request);
state.setStatus(OrderStatus.INVENTORY_RESERVED);
compensationActions.add(() -> activities.releaseInventory(request));
// 4. Schedule shipping (compensation: cancel)
state.setStatus(OrderStatus.SCHEDULING_SHIPPING);
activities.scheduleShipping(request);
state.setStatus(OrderStatus.SHIPPING_SCHEDULED);
compensationActions.add(() -> activities.cancelShipping(request));
// 5. Finalize order
state.setStatus(OrderStatus.FINALIZING);
activities.finalizeOrder(request);
state.setStatus(OrderStatus.COMPLETED);
return OrderProcessingResult.success(request.getOrderId());
} catch (Exception e) {
// Execute compensation actions in reverse order
executeCompensationActions(compensationActions);
throw e;
}
}
private void executeCompensationActions(List<Runnable> compensationActions) {
// Execute in reverse order (LIFO)
for (int i = compensationActions.size() - 1; i >= 0; i--) {
try {
compensationActions.get(i).run();
} catch (Exception e) {
logger.error("Compensation action failed", e);
// Continue with other compensations
}
}
}
private OrderProcessingResult handleWorkflowFailure(OrderProcessingRequest request, Exception failure) {
if (orderCancelled) {
activities.notifyOrderCancelled(request, state.getCancellationReason());
return OrderProcessingResult.cancelled(request.getOrderId(), state.getCancellationReason());
} else {
activities.notifyOrderFailed(request, failure.getMessage());
return OrderProcessingResult.failed(request.getOrderId(), failure.getMessage());
}
}
// Signal and query methods same as before
@Override
public void updatePaymentStatus(PaymentStatusSignal signal) {
state.setPaymentStatus(signal.getStatus());
}
@Override
public void updateShippingStatus(ShippingStatusSignal signal) {
state.setShippingStatus(signal.getStatus());
}
@Override
public void cancelOrder(CancelOrderSignal signal) {
orderCancelled = true;
state.setStatus(OrderStatus.CANCELLED);
state.setCancellationReason(signal.getReason());
}
@Override
public OrderProcessingState getOrderState() {
return state;
}
}
// Circuit breaker for activities
@Component
@Slf4j
public class ActivityCircuitBreaker {
private final Map<String, CircuitBreaker> breakers;
public ActivityCircuitBreaker() {
this.breakers = new ConcurrentHashMap<>();
}
public <T> T executeWithCircuitBreaker(String activityName, Supplier<T> supplier) {
CircuitBreaker breaker = breakers.computeIfAbsent(activityName, this::createCircuitBreaker);
return breaker.executeSupplier(supplier);
}
public void executeWithCircuitBreaker(String activityName, Runnable runnable) {
CircuitBreaker breaker = breakers.computeIfAbsent(activityName, this::createCircuitBreaker);
breaker.executeRunnable(runnable);
}
private CircuitBreaker createCircuitBreaker(String activityName) {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.slowCallRateThreshold(50)
.waitDurationInOpenState(Duration.ofMinutes(5))
.slowCallDurationThreshold(Duration.ofSeconds(30))
.permittedNumberOfCallsInHalfOpenState(5)
.minimumNumberOfCalls(10)
.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED)
.slidingWindowSize(20)
.build();
return CircuitBreaker.of(activityName, config);
}
}
// Enhanced activities with circuit breaker
@Component
@Slf4j
public class ResilientOrderProcessingActivitiesImpl implements OrderProcessingActivities {
private final OrderProcessingActivitiesImpl delegate;
private final ActivityCircuitBreaker circuitBreaker;
public ResilientOrderProcessingActivitiesImpl(OrderProcessingActivitiesImpl delegate,
ActivityCircuitBreaker circuitBreaker) {
this.delegate = delegate;
this.circuitBreaker = circuitBreaker;
}
@Override
public void validateOrder(OrderProcessingRequest request) {
circuitBreaker.executeWithCircuitBreaker("validateOrder", 
() -> delegate.validateOrder(request));
}
@Override
public void processPayment(OrderProcessingRequest request) {
circuitBreaker.executeWithCircuitBreaker("processPayment", 
() -> delegate.processPayment(request));
}
@Override
public void reserveInventory(OrderProcessingRequest request) {
circuitBreaker.executeWithCircuitBreaker("reserveInventory", 
() -> delegate.reserveInventory(request));
}
// ... other methods with circuit breaker
}

Signals & Queries

6. Advanced Signal and Query Patterns

// Workflow with enhanced signals and queries
@WorkflowInterface
public interface EnhancedOrderProcessingWorkflow {
@WorkflowMethod
OrderProcessingResult processOrder(OrderProcessingRequest request);
// Payment signals
@SignalMethod
void updatePaymentStatus(PaymentStatusSignal signal);
@SignalMethod
void requestPaymentRetry(PaymentRetrySignal signal);
// Shipping signals
@SignalMethod
void updateShippingStatus(ShippingStatusSignal signal);
@SignalMethod
void updateTrackingInfo(TrackingInfoSignal signal);
// Order management signals
@SignalMethod
void cancelOrder(CancelOrderSignal signal);
@SignalMethod
void updateOrderItems(UpdateItemsSignal signal);
// Customer communication signals
@SignalMethod
void customerApproval(CustomerApprovalSignal signal);
// Queries
@QueryMethod
OrderProcessingState getOrderState();
@QueryMethod
OrderProcessingTimeline getOrderTimeline();
@QueryMethod
List<ProcessingStep> getPendingSteps();
@QueryMethod
String getCurrentStatus();
}
// Enhanced workflow implementation
public class EnhancedOrderProcessingWorkflowImpl implements EnhancedOrderProcessingWorkflow {
private final OrderProcessingActivities activities;
private OrderProcessingState state = new OrderProcessingState();
private OrderProcessingTimeline timeline = new OrderProcessingTimeline();
private CompletablePromise<Void> paymentCompleted = Workflow.newPromise();
private CompletablePromise<Void> customerApprovalReceived = Workflow.newPromise();
private CompletablePromise<Void> shippingScheduled = Workflow.newPromise();
private boolean orderCancelled = false;
private String cancellationReason;
public EnhancedOrderProcessingWorkflowImpl() {
ActivityOptions activityOptions = new ActivityOptions.Builder()
.setScheduleToCloseTimeout(Duration.ofHours(2))
.setRetryOptions(new ActivityRetryOptions().getDefaultRetryOptions())
.build();
this.activities = Workflow.newActivityStub(OrderProcessingActivities.class, activityOptions);
}
@Override
public OrderProcessingResult processOrder(OrderProcessingRequest request) {
logWorkflowEvent("Workflow started", request.getOrderId());
state.setOrderId(request.getOrderId());
timeline.setWorkflowStart(Instant.now());
try {
return executeOrderProcessing(request);
} catch (Exception e) {
return handleProcessingFailure(request, e);
}
}
private OrderProcessingResult executeOrderProcessing(OrderProcessingRequest request) {
// 1. Order validation
executeStep("VALIDATION", () -> activities.validateOrder(request));
// 2. Payment processing (with potential external approval)
if (requiresCustomerApproval(request)) {
executeStep("AWAITING_APPROVAL", () -> awaitCustomerApproval());
}
executeStep("PAYMENT_PROCESSING", () -> processPaymentWithRetries(request));
// 3. Inventory management
executeStep("INVENTORY_RESERVATION", () -> activities.reserveInventory(request));
// 4. Shipping coordination
executeStep("SHIPPING_SCHEDULING", () -> activities.scheduleShipping(request));
// 5. Order finalization
executeStep("ORDER_FINALIZATION", () -> activities.finalizeOrder(request));
logWorkflowEvent("Workflow completed successfully", request.getOrderId());
timeline.setWorkflowEnd(Instant.now());
return OrderProcessingResult.success(request.getOrderId());
}
private void executeStep(String stepName, Runnable stepAction) {
if (orderCancelled) {
throw new OrderCancelledException("Order cancelled during step: " + stepName);
}
logWorkflowEvent("Starting step: " + stepName, state.getOrderId());
timeline.addStep(stepName, Instant.now(), null);
try {
stepAction.run();
timeline.markStepCompleted(stepName, Instant.now());
logWorkflowEvent("Step completed: " + stepName, state.getOrderId());
} catch (Exception e) {
timeline.markStepFailed(stepName, Instant.now(), e.getMessage());
logWorkflowEvent("Step failed: " + stepName + " - " + e.getMessage(), state.getOrderId());
throw e;
}
}
private void processPaymentWithRetries(OrderProcessingRequest request) {
int attempt = 1;
int maxAttempts = 3;
while (attempt <= maxAttempts) {
try {
activities.processPayment(request);
return;
} catch (PaymentFailedException e) {
if (attempt == maxAttempts) {
throw e;
}
logWorkflowEvent("Payment attempt " + attempt + " failed, waiting for retry", 
state.getOrderId());
// Wait before retry
Workflow.sleep(Duration.ofMinutes(5));
attempt++;
}
}
}
private void awaitCustomerApproval() {
logWorkflowEvent("Waiting for customer approval", state.getOrderId());
// Wait for approval with timeout
boolean approved = Workflow.await(Duration.ofHours(24), () -> 
customerApprovalReceived.isCompleted() || orderCancelled);
if (!approved) {
throw new CustomerApprovalTimeoutException("Customer approval timeout after 24 hours");
}
}
// Signal handlers
@Override
public void updatePaymentStatus(PaymentStatusSignal signal) {
logWorkflowEvent("Payment status updated: " + signal.getStatus(), state.getOrderId());
state.setPaymentStatus(signal.getStatus());
if (signal.getStatus() == PaymentStatus.COMPLETED) {
paymentCompleted.complete(null);
}
}
@Override
public void requestPaymentRetry(PaymentRetrySignal signal) {
logWorkflowEvent("Payment retry requested", state.getOrderId());
// Implementation for manual payment retry
}
@Override
public void updateShippingStatus(ShippingStatusSignal signal) {
logWorkflowEvent("Shipping status updated: " + signal.getStatus(), state.getOrderId());
state.setShippingStatus(signal.getStatus());
}
@Override
public void updateTrackingInfo(TrackingInfoSignal signal) {
logWorkflowEvent("Tracking info updated: " + signal.getTrackingNumber(), state.getOrderId());
state.setTrackingNumber(signal.getTrackingNumber());
}
@Override
public void cancelOrder(CancelOrderSignal signal) {
logWorkflowEvent("Order cancellation requested: " + signal.getReason(), state.getOrderId());
orderCancelled = true;
cancellationReason = signal.getReason();
state.setStatus(OrderStatus.CANCELLED);
}
@Override
public void updateOrderItems(UpdateItemsSignal signal) {
logWorkflowEvent("Order items updated", state.getOrderId());
// Implementation for updating order items
}
@Override
public void customerApproval(CustomerApprovalSignal signal) {
logWorkflowEvent("Customer approval received: " + signal.isApproved(), state.getOrderId());
if (signal.isApproved()) {
customerApprovalReceived.complete(null);
} else {
customerApprovalReceived.completeExceptionally(
new CustomerApprovalDeniedException("Customer denied approval")
);
}
}
// Query handlers
@Override
public OrderProcessingState getOrderState() {
return state;
}
@Override
public OrderProcessingTimeline getOrderTimeline() {
return timeline;
}
@Override
public List<ProcessingStep> getPendingSteps() {
return timeline.getPendingSteps();
}
@Override
public String getCurrentStatus() {
return state.getStatus().toString();
}
private void logWorkflowEvent(String message, String orderId) {
logger.info("[Workflow {}] {}", orderId, message);
timeline.addEvent(message, Instant.now());
}
private boolean requiresCustomerApproval(OrderProcessingRequest request) {
// Logic to determine if order requires customer approval
return calculateTotalAmount(request.getItems()).compareTo(new BigDecimal("1000")) > 0;
}
private OrderProcessingResult handleProcessingFailure(OrderProcessingRequest request, Exception failure) {
logWorkflowEvent("Workflow failed: " + failure.getMessage(), request.getOrderId());
timeline.setWorkflowEnd(Instant.now());
if (orderCancelled) {
activities.notifyOrderCancelled(request, cancellationReason);
return OrderProcessingResult.cancelled(request.getOrderId(), cancellationReason);
} else {
activities.notifyOrderFailed(request, failure.getMessage());
return OrderProcessingResult.failed(request.getOrderId(), failure.getMessage());
}
}
}
// Enhanced data models
@Data
public class OrderProcessingTimeline {
private Instant workflowStart;
private Instant workflowEnd;
private List<ProcessingStep> steps = new ArrayList<>();
private List<TimelineEvent> events = new ArrayList<>();
public void addStep(String name, Instant startTime, Instant endTime) {
steps.add(new ProcessingStep(name, startTime, endTime, null));
}
public void markStepCompleted(String name, Instant endTime) {
steps.stream()
.filter(step -> step.getName().equals(name) && step.getEndTime() == null)
.findFirst()
.ifPresent(step -> step.setEndTime(endTime));
}
public void markStepFailed(String name, Instant endTime, String error) {
steps.stream()
.filter(step -> step.getName().equals(name) && step.getEndTime() == null)
.findFirst()
.ifPresent(step -> {
step.setEndTime(endTime);
step.setError(error);
});
}
public void addEvent(String description, Instant timestamp) {
events.add(new TimelineEvent(description, timestamp));
}
public List<ProcessingStep> getPendingSteps() {
return steps.stream()
.filter(step -> step.getEndTime() == null)
.collect(Collectors.toList());
}
}
@Data
public class ProcessingStep {
private String name;
private Instant startTime;
private Instant endTime;
private String error;
public ProcessingStep(String name, Instant startTime, Instant endTime, String error) {
this.name = name;
this.startTime = startTime;
this.endTime = endTime;
this.error = error;
}
public boolean isCompleted() {
return endTime != null && error == null;
}
public boolean isFailed() {
return endTime != null && error != null;
}
public boolean isInProgress() {
return endTime == null;
}
}
@Data
public class TimelineEvent {
private String description;
private Instant timestamp;
public TimelineEvent(String description, Instant timestamp) {
this.description = description;
this.timestamp = timestamp;
}
}
// Additional signal classes
@Data
public class PaymentRetrySignal {
private String orderId;
private String paymentMethod;
private String reason;
}
@Data
public class TrackingInfoSignal {
private String orderId;
private String trackingNumber;
private String carrier;
private String trackingUrl;
}
@Data
public class UpdateItemsSignal {
private String orderId;
private List<OrderItem> addedItems;
private List<OrderItem> removedItems;
private String reason;
}
@Data
public class CustomerApprovalSignal {
private String orderId;
private boolean approved;
private String comments;
private Instant approvedAt;
}

Event Sourcing

7. Event Sourcing with Cadence

// Event-sourced workflow
@WorkflowInterface
public interface EventSourcedOrderWorkflow {
@WorkflowMethod
OrderProcessingResult processOrder(OrderProcessingRequest request);
@SignalMethod
void applyEvent(OrderEvent event);
@QueryMethod
OrderProjection getOrderProjection();
@QueryMethod
List<OrderEvent> getEventHistory();
}
// Event-sourced workflow implementation
public class EventSourcedOrderWorkflowImpl implements EventSourcedOrderWorkflow {
private final List<OrderEvent> events = new ArrayList<>();
private OrderProjection projection = new OrderProjection();
private boolean workflowCompleted = false;
@Override
public OrderProcessingResult processOrder(OrderProcessingRequest request) {
// Start by applying the order created event
applyEvent(new OrderCreatedEvent(request));
try {
// Process the order using events
return processOrderWithEvents(request);
} catch (Exception e) {
applyEvent(new OrderFailedEvent(request.getOrderId(), e.getMessage()));
return OrderProcessingResult.failed(request.getOrderId(), e.getMessage());
}
}
@Override
public void applyEvent(OrderEvent event) {
events.add(event);
projection.apply(event);
logger.info("Applied event: {} for order: {}", event.getType(), event.getOrderId());
}
@Override
public OrderProjection getOrderProjection() {
return projection;
}
@Override
public List<OrderEvent> getEventHistory() {
return new ArrayList<>(events);
}
private OrderProcessingResult processOrderWithEvents(OrderProcessingRequest request) {
// Replay events to rebuild state
replayEvents();
// Check if workflow is already completed
if (workflowCompleted) {
return OrderProcessingResult.success(request.getOrderId());
}
// Process based on current state
if (projection.getStatus() == OrderStatus.CREATED) {
processNewOrder(request);
}
// Wait for completion
Workflow.await(Duration.ofHours(24), () -> workflowCompleted);
if (workflowCompleted) {
return OrderProcessingResult.success(request.getOrderId());
} else {
return OrderProcessingResult.failed(request.getOrderId(), "Workflow timeout");
}
}
private void replayEvents() {
OrderProjection newProjection = new OrderProjection();
for (OrderEvent event : events) {
newProjection.apply(event);
}
this.projection = newProjection;
this.workflowCompleted = projection.getStatus() == OrderStatus.COMPLETED;
}
private void processNewOrder(OrderProcessingRequest request) {
try {
// Validate order
applyEvent(new OrderValidatedEvent(request.getOrderId()));
// Process payment
applyEvent(new PaymentProcessedEvent(request.getOrderId(), "txn_123"));
// Reserve inventory
applyEvent(new InventoryReservedEvent(request.getOrderId()));
// Schedule shipping
applyEvent(new ShippingScheduledEvent(request.getOrderId(), "tracking_123"));
// Complete order
applyEvent(new OrderCompletedEvent(request.getOrderId()));
workflowCompleted = true;
} catch (Exception e) {
applyEvent(new OrderFailedEvent(request.getOrderId(), e.getMessage()));
throw e;
}
}
}
// Event classes
public abstract class OrderEvent {
private String eventId;
private String orderId;
private String type;
private Instant timestamp;
private int version;
public OrderEvent(String orderId, String type) {
this.eventId = UUID.randomUUID().toString();
this.orderId = orderId;
this.type = type;
this.timestamp = Instant.now();
this.version = 1;
}
// Getters and setters
public String getEventId() { return eventId; }
public String getOrderId() { return orderId; }
public String getType() { return type; }
public Instant getTimestamp() { return timestamp; }
public int getVersion() { return version; }
}
public class OrderCreatedEvent extends OrderEvent {
private OrderProcessingRequest orderRequest;
public OrderCreatedEvent(OrderProcessingRequest orderRequest) {
super(orderRequest.getOrderId(), "ORDER_CREATED");
this.orderRequest = orderRequest;
}
public OrderProcessingRequest getOrderRequest() { return orderRequest; }
}
public class OrderValidatedEvent extends OrderEvent {
public OrderValidatedEvent(String orderId) {
super(orderId, "ORDER_VALIDATED");
}
}
public class PaymentProcessedEvent extends OrderEvent {
private String transactionId;
private BigDecimal amount;
public PaymentProcessedEvent(String orderId, String transactionId) {
super(orderId, "PAYMENT_PROCESSED");
this.transactionId = transactionId;
}
public String getTransactionId() { return transactionId; }
public BigDecimal getAmount() { return amount; }
}
public class InventoryReservedEvent extends OrderEvent {
public InventoryReservedEvent(String orderId) {
super(orderId, "INVENTORY_RESERVED");
}
}
public class ShippingScheduledEvent extends OrderEvent {
private String trackingNumber;
public ShippingScheduledEvent(String orderId, String trackingNumber) {
super(orderId, "SHIPPING_SCHEDULED");
this.trackingNumber = trackingNumber;
}
public String getTrackingNumber() { return trackingNumber; }
}
public class OrderCompletedEvent extends OrderEvent {
public OrderCompletedEvent(String orderId) {
super(orderId, "ORDER_COMPLETED");
}
}
public class OrderFailedEvent extends OrderEvent {
private String errorMessage;
public OrderFailedEvent(String orderId, String errorMessage) {
super(orderId, "ORDER_FAILED");
this.errorMessage = errorMessage;
}
public String getErrorMessage() { return errorMessage; }
}
// Projection for event sourcing
@Data
public class OrderProjection {
private String orderId;
private OrderStatus status = OrderStatus.CREATED;
private List<OrderEvent> appliedEvents = new ArrayList<>();
private Instant createdAt;
private Instant updatedAt;
public void apply(OrderEvent event) {
appliedEvents.add(event);
this.updatedAt = event.getTimestamp();
switch (event.getType()) {
case "ORDER_CREATED":
this.orderId = event.getOrderId();
this.createdAt = event.getTimestamp();
this.status = OrderStatus.CREATED;
break;
case "ORDER_VALIDATED":
this.status = OrderStatus.VALIDATED;
break;
case "PAYMENT_PROCESSED":
this.status = OrderStatus.PAYMENT_COMPLETED;
break;
case "INVENTORY_RESERVED":
this.status = OrderStatus.INVENTORY_RESERVED;
break;
case "SHIPPING_SCHEDULED":
this.status = OrderStatus.SHIPPING_SCHEDULED;
break;
case "ORDER_COMPLETED":
this.status = OrderStatus.COMPLETED;
break;
case "ORDER_FAILED":
this.status = OrderStatus.FAILED;
break;
}
}
public boolean canAcceptPayment() {
return status == OrderStatus.VALIDATED || status == OrderStatus.CREATED;
}
public boolean canCancel() {
return status != OrderStatus.COMPLETED && status != OrderStatus.CANCELLED;
}
}

Testing Strategies

8. Comprehensive Testing

// Workflow test base class
@ExtendWith(MockitoExtension.class)
public class WorkflowTestBase {
protected TestWorkflowEnvironment testEnvironment;
protected Worker worker;
protected WorkflowClient workflowClient;
@BeforeEach
void setUp() {
testEnvironment = TestWorkflowEnvironment.newInstance();
workflowClient = testEnvironment.newWorkflowClient();
// Create worker
WorkerFactory factory = testEnvironment.newWorkerFactory();
worker = factory.newWorker("ORDER_TASK_LIST");
worker.registerWorkflowImplementationTypes(
OrderProcessingWorkflowImpl.class,
EnhancedOrderProcessingWorkflowImpl.class
);
factory.start();
}
@AfterEach
void tearDown() {
testEnvironment.close();
}
}
// Workflow unit tests
class OrderProcessingWorkflowTest extends WorkflowTestBase {
@Test
void testSuccessfulOrderProcessing() {
// Given
OrderProcessingWorkflow workflow = workflowClient.newWorkflowStub(
OrderProcessingWorkflow.class,
new WorkflowOptions.Builder()
.setTaskList("ORDER_TASK_LIST")
.setExecutionStartToCloseTimeout(Duration.ofHours(1))
.build()
);
OrderProcessingRequest request = createTestOrderRequest();
// When
WorkflowClient.execute(workflow::processOrder, request);
// Then - Workflow should complete successfully
// In real test, you would mock activities and verify calls
}
@Test
void testOrderCancellation() {
// Given
OrderProcessingWorkflow workflow = workflowClient.newWorkflowStub(
OrderProcessingWorkflow.class,
new WorkflowOptions.Builder()
.setTaskList("ORDER_TASK_LIST")
.setExecutionStartToCloseTimeout(Duration.ofHours(1))
.build()
);
OrderProcessingRequest request = createTestOrderRequest();
// When
WorkflowExecution execution = WorkflowClient.start(workflow::processOrder, request);
// Send cancellation signal
workflow.cancelOrder(new CancelOrderSignal(request.getOrderId(), "Customer requested"));
// Then
OrderProcessingResult result = workflowClient.newWorkflowStub(
OrderProcessingWorkflow.class, execution.getWorkflowId()).getResult();
assertThat(result.isSuccess()).isFalse();
assertThat(result.getStatus()).isEqualTo("CANCELLED");
}
@Test
void testPaymentTimeout() {
// Given
OrderProcessingWorkflow workflow = workflowClient.newWorkflowStub(
OrderProcessingWorkflow.class,
new WorkflowOptions.Builder()
.setTaskList("ORDER_TASK_LIST")
.setExecutionStartToCloseTimeout(Duration.ofHours(1))
.build()
);
OrderProcessingRequest request = createTestOrderRequest();
request.setPaymentPending(true); // Requires external payment
// When & Then
assertThatThrownBy(() -> WorkflowClient.execute(workflow::processOrder, request))
.isInstanceOf(PaymentTimeoutException.class);
}
private OrderProcessingRequest createTestOrderRequest() {
OrderProcessingRequest request = new OrderProcessingRequest();
request.setOrderId("test-order-123");
request.setCustomerId("customer-456");
request.setItems(List.of(
new OrderItem("product-1", 2, new BigDecimal("29.99")),
new OrderItem("product-2", 1, new BigDecimal("49.99"))
));
request.setPaymentInfo(new PaymentInfo());
request.setShippingInfo(new ShippingInfo());
return request;
}
}
// Activity unit tests
@ExtendWith(MockitoExtension.class)
class OrderProcessingActivitiesTest {
@Mock
private OrderService orderService;
@Mock
private PaymentService paymentService;
@Mock
private InventoryService inventoryService;
@Mock
private ShippingService shippingService;
@Mock
private NotificationService notificationService;
private OrderProcessingActivities activities;
@BeforeEach
void setUp() {
activities = new OrderProcessingActivitiesImpl(
orderService, paymentService, inventoryService, 
shippingService, notificationService
);
}
@Test
void testValidateOrder_Success() {
// Given
OrderProcessingRequest request = createTestOrderRequest();
when(orderService.isCustomerValid(anyString())).thenReturn(true);
// When & Then
assertThatNoException().isThrownBy(() -> activities.validateOrder(request));
}
@Test
void testValidateOrder_InvalidCustomer() {
// Given
OrderProcessingRequest request = createTestOrderRequest();
when(orderService.isCustomerValid(anyString())).thenReturn(false);
// When & Then
assertThatThrownBy(() -> activities.validateOrder(request))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Invalid customer");
}
@Test
void testProcessPayment_Success() {
// Given
OrderProcessingRequest request = createTestOrderRequest();
PaymentResult paymentResult = new PaymentResult(true, "txn_123", null);
when(paymentService.processPayment(any(), any())).thenReturn(paymentResult);
// When & Then
assertThatNoException().isThrownBy(() -> activities.processPayment(request));
}
@Test
void testProcessPayment_Failure() {
// Given
OrderProcessingRequest request = createTestOrderRequest();
PaymentResult paymentResult = new PaymentResult(false, null, "Insufficient funds");
when(paymentService.processPayment(any(), any())).thenReturn(paymentResult);
// When & Then
assertThatThrownBy(() -> activities.processPayment(request))
.isInstanceOf(PaymentFailedException.class)
.hasMessageContaining("Payment processing failed");
}
private OrderProcessingRequest createTestOrderRequest() {
OrderProcessingRequest request = new OrderProcessingRequest();
request.setOrderId("test-order-123");
request.setCustomerId("customer-456");
request.setItems(List.of(new OrderItem("product-1", 1, new BigDecimal("29.99"))));
request.setPaymentInfo(new PaymentInfo());
ShippingInfo shippingInfo = new ShippingInfo();
shippingInfo.setAddress(new Address("123 Main St", "City", "State", "12345", "US"));
request.setShippingInfo(shippingInfo);
return request;
}
}
// Integration test
@SpringBootTest
@TestPropertySource(properties = {
"cadence.host=localhost",
"cadence.port=7933",
"cadence.domain=test-domain"
})
class OrderProcessingIntegrationTest {
@Autowired
private WorkflowClient workflowClient;
@Autowired
private OrderProcessingWorkflowClient orderWorkflowClient;
@Test
void testEndToEndOrderProcessing() {
// Given
OrderProcessingRequest request = createTestOrderRequest();
// When
OrderProcessingResult result = orderWorkflowClient.startOrderProcessing(request);
// Then
assertThat(result).isNotNull();
assertThat(result.isSuccess()).isTrue();
assertThat(result.getOrderId()).isEqualTo(request.getOrderId());
}
private OrderProcessingRequest createTestOrderRequest() {
OrderProcessingRequest request = new OrderProcessingRequest();
request.setOrderId("integration-test-order");
request.setCustomerId("test-customer");
request.setItems(List.of(
new OrderItem("test-product-1", 1, new BigDecimal("19.99"))
));
request.setPaymentInfo(createTestPaymentInfo());
request.setShippingInfo(createTestShippingInfo());
return request;
}
private PaymentInfo createTestPaymentInfo() {
PaymentInfo paymentInfo = new PaymentInfo();
paymentInfo.setMethod("CREDIT_CARD");
paymentInfo.setCardNumber("4111111111111111");
paymentInfo.setExpiryDate("12/25");
paymentInfo.setCvv("123");
return paymentInfo;
}
private ShippingInfo createTestShippingInfo() {
ShippingInfo shippingInfo = new ShippingInfo();
shippingInfo.setAddress(new Address(
"123 Test St", "Test City", "TS", "12345", "US"
));
shippingInfo.setMethod("STANDARD");
return shippingInfo;
}
}

Production Deployment

9. Spring Boot Integration & Production Setup

# application.yml
spring:
application:
name: order-service
datasource:
url: jdbc:postgresql://localhost:5432/orders
username: order_user
password: order_pass
hikari:
maximum-pool-size: 20
cadence:
host: ${CADENCE_HOST:localhost}
port: ${CADENCE_PORT:7933}
domain: ${CADENCE_DOMAIN:production}
identity: ${HOSTNAME:order-service}
tls:
enabled: ${CADENCE_TLS_ENABLED:false}
cert: ${CADENCE_TLS_CERT:}
key: ${CADENCE_TLS_KEY:}
ca: ${CADENCE_TLS_CA:}
management:
endpoints:
web:
exposure:
include: health,metrics,info,prometheus
endpoint:
health:
show-details: always
metrics:
export:
prometheus:
enabled: true
logging:
level:
com.uber.cadence: INFO
com.example.workflow: DEBUG
// Worker configuration and startup
@Component
@Slf4j
public class CadenceWorkerManager {
private final WorkerFactory workerFactory;
private final List<Worker> workers = new ArrayList<>();
public CadenceWorkerManager(WorkerFactory workerFactory,
OrderProcessingActivities orderActivities,
PaymentProcessingActivities paymentActivities) {
this.workerFactory = workerFactory;
initializeWorkers(orderActivities, paymentActivities);
}
@PostConstruct
public void startWorkers() {
workerFactory.start();
log.info("Cadence workers started successfully");
}
@PreDestroy
public void stopWorkers() {
workerFactory.shutdown();
log.info("Cadence workers stopped");
}
private void initializeWorkers(OrderProcessingActivities orderActivities,
PaymentProcessingActivities paymentActivities) {
// Order processing worker
Worker orderWorker = workerFactory.newWorker("ORDER_TASK_LIST");
orderWorker.registerWorkflowImplementationTypes(
OrderProcessingWorkflowImpl.class,
EnhancedOrderProcessingWorkflowImpl.class,
EventSourcedOrderWorkflowImpl.class
);
orderWorker.registerActivitiesImplementations(orderActivities);
workers.add(orderWorker);
// Payment processing worker
Worker paymentWorker = workerFactory.newWorker("PAYMENT_TASK_LIST");
paymentWorker.registerWorkflowImplementationTypes(PaymentProcessingWorkflowImpl.class);
paymentWorker.registerActivitiesImplementations(paymentActivities);
workers.add(paymentWorker);
log.info("Initialized {} Cadence workers", workers.size());
}
}
// Workflow client service
@Service
@Slf4j
public class OrderProcessingWorkflowClient {
private final WorkflowClient workflowClient;
public OrderProcessingWorkflowClient(WorkflowClient workflowClient) {
this.workflowClient = workflowClient;
}
public OrderProcessingResult startOrderProcessing(OrderProcessingRequest request) {
try {
OrderProcessingWorkflow workflow = workflowClient.newWorkflowStub(
OrderProcessingWorkflow.class,
new WorkflowOptions.Builder()
.setTaskList("ORDER_TASK_LIST")
.setExecutionStartToCloseTimeout(Duration.ofHours(24))
.setWorkflowId("order-" + request.getOrderId())
.build()
);
return workflow.processOrder(request);
} catch (Exception e) {
log.error("Failed to start order processing workflow for order: {}", request.getOrderId(), e);
throw new WorkflowException("Failed to start order processing", e);
}
}
public void signalPaymentStatus(String orderId, PaymentStatus status) {
try {
OrderProcessingWorkflow workflow = workflowClient.newWorkflowStub(
OrderProcessingWorkflow.class, "order-" + orderId);
PaymentStatusSignal signal = new PaymentStatusSignal();
signal.setOrderId(orderId);
signal.setStatus(status);
workflow.updatePaymentStatus(signal);
} catch (Exception e) {
log.error("Failed to send payment status signal for order: {}", orderId, e);
throw new WorkflowException("Failed to send payment signal", e);
}
}
public void cancelOrder(String orderId, String reason) {
try {
OrderProcessingWorkflow workflow = workflowClient.newWorkflowStub(
OrderProcessingWorkflow.class, "order-" + orderId);
CancelOrderSignal signal = new CancelOrderSignal();
signal.setOrderId(orderId);
signal.setReason(reason);
workflow.cancelOrder(signal);
} catch (Exception e) {
log.error("Failed to cancel order: {}", orderId, e);
throw new WorkflowException("Failed to cancel order", e);
}
}
public OrderProcessingState queryOrderState(String orderId) {
try {
OrderProcessingWorkflow workflow = workflowClient.newWorkflowStub(
OrderProcessingWorkflow.class, "order-" + orderId);
return workflow.getOrderState();
} catch (Exception e) {
log.error("Failed to query order state for order: {}", orderId, e);
throw new WorkflowException("Failed to query order state", e);
}
}
}
// REST controller for workflow management
@RestController
@RequestMapping("/api/orders")
@Slf4j
public class OrderController {
private final OrderProcessingWorkflowClient workflowClient;
private final OrderService orderService;
public OrderController(OrderProcessingWorkflowClient workflowClient, OrderService orderService) {
this.workflowClient = workflowClient;
this.orderService = orderService;
}
@PostMapping
public ResponseEntity<OrderResponse> createOrder(@RequestBody CreateOrderRequest request) {
try {
OrderProcessingRequest workflowRequest = convertToWorkflowRequest(request);
OrderProcessingResult result = workflowClient.startOrderProcessing(workflowRequest);
OrderResponse response = new OrderResponse();
response.setOrderId(result.getOrderId());
response.setStatus(result.getStatus());
response.setSuccess(result.isSuccess());
if (!result.isSuccess()) {
response.setErrorMessage(result.getErrorMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
return ResponseEntity.status(HttpStatus.ACCEPTED).body(response);
} catch (Exception e) {
log.error("Failed to create order", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
@PostMapping("/{orderId}/cancel")
public ResponseEntity<Void> cancelOrder(@PathVariable String orderId, 
@RequestBody CancelOrderRequest request) {
try {
workflowClient.cancelOrder(orderId, request.getReason());
return ResponseEntity.accepted().build();
} catch (Exception e) {
log.error("Failed to cancel order: {}", orderId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
@GetMapping("/{orderId}/status")
public ResponseEntity<OrderStatusResponse> getOrderStatus(@PathVariable String orderId) {
try {
OrderProcessingState state = workflowClient.queryOrderState(orderId);
OrderStatusResponse response = new OrderStatusResponse();
response.setOrderId(orderId);
response.setStatus(state.getStatus().toString());
response.setPaymentStatus(state.getPaymentStatus().toString());
response.setShippingStatus(state.getShippingStatus().toString());
response.setLastUpdated(state.getUpdatedAt());
return ResponseEntity.ok(response);
} catch (Exception e) {
log.error("Failed to get order status: {}", orderId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
@PostMapping("/{orderId}/payment-status")
public ResponseEntity<Void> updatePaymentStatus(@PathVariable String orderId,
@RequestBody PaymentStatusRequest request) {
try {
workflowClient.signalPaymentStatus(orderId, request.getStatus());
return ResponseEntity.accepted().build();
} catch (Exception e) {
log.error("Failed to update payment status for order: {}", orderId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
private OrderProcessingRequest convertToWorkflowRequest(CreateOrderRequest request) {
OrderProcessingRequest workflowRequest = new OrderProcessingRequest();
workflowRequest.setOrderId(generateOrderId());
workflowRequest.setCustomerId(request.getCustomerId());
workflowRequest.setItems(request.getItems());
workflowRequest.setPaymentInfo(request.getPaymentInfo());
workflowRequest.setShippingInfo(request.getShippingInfo());
workflowRequest.setPaymentPending(request.isPaymentPending());
return workflowRequest;
}
private String generateOrderId() {
return "ORD-" + System.currentTimeMillis() + "-" + UUID.randomUUID().toString().substring(0, 8);
}
}
// Health check for Cadence
@Component
public class CadenceHealthIndicator implements HealthIndicator {
private final WorkflowClient workflowClient;
public CadenceHealthIndicator(WorkflowClient workflowClient) {
this.workflowClient = workflowClient;
}
@Override
public Health health() {
try {
// Try to list domains as a health check
workflowClient.getDomain("default");
return Health.up().withDetail("service", "cadence").build();
} catch (Exception e) {
return Health.down(e).withDetail("service", "cadence").build();
}
}
}
// Main application
@SpringBootApplication
@Slf4j
public class OrderServiceApplication {
public static void main(String[] args) {
SpringApplication.run(OrderServiceApplication.class, args);
}
@Bean
public CommandLineRunner demo(OrderProcessingWorkflowClient workflowClient) {
return args -> {
log.info("Order Service Application started successfully");
// Demo workflow execution if needed
if (args.length > 0 && "demo".equals(args[0])) {
executeDemoWorkflow(workflowClient);
}
};
}
private void executeDemoWorkflow(OrderProcessingWorkflowClient workflowClient) {
try {
OrderProcessingRequest request = createDemoOrder();
OrderProcessingResult result = workflowClient.startOrderProcessing(request);
log.info("Demo workflow completed: {}", result);
} catch (Exception e) {
log.error("Demo workflow failed", e);
}
}
private OrderProcessingRequest createDemoOrder() {
OrderProcessingRequest request = new OrderProcessingRequest();
request.setOrderId("demo-order-123");
request.setCustomerId("demo-customer");
request.setItems(List.of(
new OrderItem("demo-product", 1, new BigDecimal("99.99"))
));
request.setPaymentInfo(new PaymentInfo());
request.setShippingInfo(new ShippingInfo());
return request;
}
}

This comprehensive Cadence workflow implementation provides:

  • Robust order processing with compensation patterns
  • Advanced error handling with retries and circuit breakers
  • Event sourcing for auditability and state recovery
  • Comprehensive testing strategies
  • Production-ready configuration with Spring Boot integration
  • REST API for workflow management
  • Monitoring and health checks

The implementation demonstrates how to build scalable, reliable workflow orchestration for complex business processes using Uber Cadence in Java.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper