Overview
Error handling in reactive pipelines is crucial for building resilient applications. Reactive streams use a publisher-subscriber model where errors are propagated as first-class citizens through the pipeline.
Core Principles
- Non-blocking error propagation
- Error as first-class citizen in data streams
- Multiple recovery strategies
- Declarative error handling
Key Operators for Error Handling
1. Basic Error Handling Operators
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
public class BasicErrorHandling {
public void basicErrorExamples() {
// Simulating a faulty flux
Flux<Integer> faultyFlux = Flux.range(1, 5)
.map(i -> {
if (i == 3) throw new RuntimeException("Error at " + i);
return i;
});
// 1. onErrorReturn - provide default value on error
Flux<Integer> withDefault = faultyFlux
.onErrorReturn(99);
// Output: 1, 2, 99
// 2. onErrorResume - fallback to another publisher
Flux<Integer> withFallback = faultyFlux
.onErrorResume(error -> Flux.just(10, 11, 12));
// Output: 1, 2, 10, 11, 12
// 3. onErrorContinue - continue processing subsequent elements
Flux<Integer> withContinue = faultyFlux
.onErrorContinue((error, element) ->
System.out.println("Error with element: " + error.getMessage()));
// Output: 1, 2, 4, 5
// 4. doOnError - side effect without modifying the stream
Flux<Integer> withSideEffect = faultyFlux
.doOnError(error -> System.out.println("Error occurred: " + error.getMessage()));
}
}
2. Retry Mechanisms
public class RetryExamples {
public void retryStrategies() {
Mono<String> unreliableService = Mono.fromCallable(() -> {
if (Math.random() > 0.3) {
throw new RuntimeException("Service unavailable");
}
return "Success";
});
// 1. Simple retry
Mono<String> simpleRetry = unreliableService
.retry(3); // Retry up to 3 times
// 2. Retry with backoff
Mono<String> retryWithBackoff = unreliableService
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(10)));
// 3. Retry with predicate
Mono<String> retryWithCondition = unreliableService
.retryWhen(Retry
.max(3)
.filter(throwable -> throwable instanceof RuntimeException)
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) ->
new ServiceUnavailableException("Service failed after retries")));
// 4. Custom retry logic
Mono<String> customRetry = unreliableService
.retryWhen(Retry.from(retryFlux ->
retryFlux
.zipWith(Flux.range(1, 4), (error, index) -> {
if (index > 3) throw new RuntimeException("Max retries exceeded");
return Duration.ofSeconds(index * 2L);
})
.flatMap(Delay::of)));
}
public static class ServiceUnavailableException extends RuntimeException {
public ServiceUnavailableException(String message) {
super(message);
}
}
}
Practical Examples
Example 1: Resilient HTTP Client
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.ClientResponse;
import reactor.util.retry.Retry;
public class ResilientHttpClient {
private final WebClient webClient;
public ResilientHttpClient(WebClient webClient) {
this.webClient = webClient;
}
public Mono<String> fetchDataWithResilience(String url) {
return webClient.get()
.uri(url)
.retrieve()
.onStatus(status -> status.is5xxServerError(),
response -> Mono.error(new ServerErrorException("Server error: " + response.statusCode())))
.onStatus(status -> status.is4xxClientError(),
response -> Mono.error(new ClientErrorException("Client error: " + response.statusCode())))
.bodyToMono(String.class)
.timeout(Duration.ofSeconds(5))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(throwable -> throwable instanceof TimeoutException ||
throwable instanceof ServerErrorException)
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) ->
new ServiceUnavailableException("Service unavailable after retries")))
.onErrorResume(ClientErrorException.class, error -> {
// Don't retry on client errors, return fallback
return Mono.just("Fallback data for client error");
})
.doOnError(error -> System.err.println("Final error: " + error.getMessage()))
.doOnSuccess(data -> System.out.println("Successfully fetched data"));
}
public Mono<String> fetchDataWithCircuitBreaker(String url) {
return webClient.get()
.uri(url)
.retrieve()
.bodyToMono(String.class)
.transform(this::withCircuitBreaker);
}
private <T> Mono<T> withCircuitBreaker(Mono<T> mono) {
// Simplified circuit breaker pattern
return mono
.timeout(Duration.ofSeconds(3))
.retryWhen(Retry.fixedDelay(2, Duration.ofSeconds(1))
.onErrorResume(throwable -> {
// After failures, return cached response or fallback
return getCachedResponse()
.switchIfEmpty(Mono.error(new CircuitBreakerOpenException("Circuit breaker open")));
});
}
private <T> Mono<T> getCachedResponse() {
// Implement cache lookup
return Mono.empty(); // Simplified
}
// Exception classes
public static class ServerErrorException extends RuntimeException {
public ServerErrorException(String message) { super(message); }
}
public static class ClientErrorException extends RuntimeException {
public ClientErrorException(String message) { super(message); }
}
public static class CircuitBreakerOpenException extends RuntimeException {
public CircuitBreakerOpenException(String message) { super(message); }
}
}
Example 2: Database Operations with Error Recovery
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ResilientDatabaseService {
private final R2dbcEntityTemplate template;
private final DatabaseRetryConfig retryConfig;
public ResilientDatabaseService(R2dbcEntityTemplate template, DatabaseRetryConfig retryConfig) {
this.template = template;
this.retryConfig = retryConfig;
}
public Mono<User> findUserById(Long id) {
return template.select(User.class)
.matching(Query.query(Criteria.where("id").is(id)))
.one()
.retryWhen(Retry.backoff(
retryConfig.getMaxAttempts(),
retryConfig.getMinBackoff())
.filter(this::isRetryableError)
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) ->
new DatabaseUnavailableException("Database unavailable after retries")))
.onErrorResume(DatabaseUnavailableException.class,
error -> findUserInCache(id)) // Fallback to cache
.switchIfEmpty(Mono.error(new UserNotFoundException("User not found: " + id)))
.doOnError(error -> logError("Error finding user", error));
}
public Flux<User> saveUsersWithErrorHandling(List<User> users) {
return Flux.fromIterable(users)
.flatMap(this::saveUserSafely, 5) // Limited concurrency
.onErrorContinue((error, user) ->
logError("Failed to save user: " + user, error));
}
private Mono<User> saveUserSafely(User user) {
return template.insert(User.class)
.using(user)
.retryWhen(Retry.fixedDelay(2, Duration.ofSeconds(1))
.onErrorResume(DataAccessException.class, error -> {
logError("Failed to save user after retries: " + user.getId(), error);
// Send to dead letter queue or alternative storage
return sendToDeadLetterQueue(user)
.then(Mono.empty()); // Don't emit the failed user
});
}
private Mono<User> findUserInCache(Long id) {
// Implement cache lookup
return Mono.empty();
}
private Mono<Void> sendToDeadLetterQueue(User user) {
// Implement dead letter queue logic
return Mono.empty();
}
private boolean isRetryableError(Throwable throwable) {
return throwable instanceof TransientDataAccessException ||
throwable instanceof TimeoutException;
}
private void logError(String message, Throwable error) {
System.err.println(message + ": " + error.getMessage());
}
// Configuration class
public static class DatabaseRetryConfig {
private int maxAttempts = 3;
private Duration minBackoff = Duration.ofSeconds(1);
// getters and setters
public int getMaxAttempts() { return maxAttempts; }
public Duration getMinBackoff() { return minBackoff; }
}
// Entity and Exception classes
public static class User {
private Long id;
private String name;
// getters and setters
}
public static class DatabaseUnavailableException extends RuntimeException {
public DatabaseUnavailableException(String message) { super(message); }
}
public static class UserNotFoundException extends RuntimeException {
public UserNotFoundException(String message) { super(message); }
}
}
Example 3: Stream Processing with Comprehensive Error Handling
public class StreamProcessingService {
public Flux<ProcessedData> processDataStream(Flux<RawData> dataStream) {
return dataStream
.window(Duration.ofSeconds(1)) // Process in time windows
.flatMap(window -> processWindowWithErrorHandling(window), 5) // Limited concurrency
.onErrorResume(this::handleCatastrophicFailure);
}
private Flux<ProcessedData> processWindowWithErrorHandling(Flux<RawData> window) {
return window
.publishOn(Schedulers.parallel()) // Process on different thread
.map(this::validateData)
.onErrorContinue((error, data) ->
handleInvalidData(data, error))
.filter(validated -> Boolean.TRUE.equals(validated.isValid()))
.map(validated -> transformData(validated.getData()))
.onErrorContinue((error, data) ->
handleTransformationError(data, error))
.flatMap(this::enrichData, 10) // Limited concurrency for enrichment
.retryWhen(Retry.backoff(3, Duration.ofMillis(100))
.onErrorResume(throwable -> handleWindowFailure(window, throwable));
}
private ValidatedData validateData(RawData data) {
try {
if (data == null || data.getId() == null) {
return new ValidatedData(data, false, "Invalid data");
}
// Additional validation logic
return new ValidatedData(data, true, null);
} catch (Exception e) {
return new ValidatedData(data, false, e.getMessage());
}
}
private ProcessedData transformData(RawData data) {
// Transformation logic that might throw exceptions
if (data.getContent().contains("invalid")) {
throw new TransformationException("Invalid content pattern");
}
return new ProcessedData(data.getId(), data.getContent().toUpperCase());
}
private Mono<ProcessedData> enrichData(ProcessedData data) {
return externalService.enrich(data)
.timeout(Duration.ofSeconds(2))
.onErrorResume(TimeoutException.class,
error -> Mono.just(data.withEnrichment("default-enrichment")))
.onErrorResume(ExternalServiceException.class,
error -> {
logError("External service failed for: " + data.getId(), error);
return Mono.just(data.withEnrichment("fallback-enrichment"));
});
}
private void handleInvalidData(RawData data, Throwable error) {
System.err.println("Invalid data skipped: " + data + ", error: " + error.getMessage());
metrics.incrementCounter("invalid_data");
}
private void handleTransformationError(RawData data, Throwable error) {
System.err.println("Transformation failed for: " + data + ", error: " + error.getMessage());
metrics.incrementCounter("transformation_errors");
}
private Flux<ProcessedData> handleWindowFailure(Flux<RawData> window, Throwable error) {
logError("Window processing failed, skipping window", error);
metrics.incrementCounter("window_processing_errors");
return Flux.empty();
}
private Flux<ProcessedData> handleCatastrophicFailure(Throwable error) {
logError("Catastrophic failure in stream processing", error);
// Maybe switch to backup processing or emergency mode
return emergencyProcessingMode()
.onErrorResume(e -> {
System.err.println("Emergency processing also failed: " + e.getMessage());
return Flux.empty();
});
}
private Flux<ProcessedData> emergencyProcessingMode() {
// Simplified emergency processing
return Flux.empty();
}
// Data classes
public static class RawData {
private String id;
private String content;
// getters and setters
}
public static class ValidatedData {
private final RawData data;
private final boolean valid;
private final String errorMessage;
// constructor and getters
}
public static class ProcessedData {
private final String id;
private final String content;
private String enrichment;
// constructor and getters
public ProcessedData withEnrichment(String enrichment) {
this.enrichment = enrichment;
return this;
}
}
// Exception classes
public static class TransformationException extends RuntimeException {
public TransformationException(String message) { super(message); }
}
public static class ExternalServiceException extends RuntimeException {
public ExternalServiceException(String message) { super(message); }
}
}
Advanced Error Handling Patterns
1. Circuit Breaker Pattern
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator;
public class CircuitBreakerExamples {
private final CircuitBreaker circuitBreaker;
public CircuitBreakerExamples() {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.waitDurationInOpenState(Duration.ofSeconds(30))
.slidingWindowSize(10)
.build();
this.circuitBreaker = CircuitBreaker.of("serviceA", config);
}
public Mono<String> withCircuitBreaker(Mono<String> serviceCall) {
return serviceCall
.transform(CircuitBreakerOperator.of(circuitBreaker))
.onErrorResume(CallNotPermittedException.class,
error -> Mono.just("Circuit breaker is open - using fallback"));
}
public Flux<String> withBulkheadPattern(Flux<String> requests) {
return requests
.flatMap(this::processWithConcurrencyLimit, 5) // Limited concurrency
.onErrorResume(throwable ->
Mono.just("Bulkhead limit exceeded - request rejected"));
}
private Mono<String> processWithConcurrencyLimit(String request) {
return Mono.fromCallable(() -> expensiveOperation(request))
.subscribeOn(Schedulers.boundedElastic())
.timeout(Duration.ofSeconds(5));
}
private String expensiveOperation(String request) {
// Simulate expensive operation
return "Processed: " + request;
}
}
2. Timeout and Fallback Strategies
public class TimeoutAndFallback {
public Mono<String> withTimeoutStrategies(Mono<String> slowService) {
return slowService
.timeout(Duration.ofSeconds(3),
Mono.defer(() -> getFastFallback())) // Primary timeout
.timeout(Duration.ofSeconds(5)) // Overall timeout
.onErrorResume(TimeoutException.class,
error -> getEmergencyFallback());
}
public Flux<String> withIndividualTimeouts(Flux<String> items) {
return items
.flatMap(item ->
processItem(item)
.timeout(Duration.ofSeconds(2))
.onErrorResume(TimeoutException.class,
error -> handleSlowItem(item, error)),
10); // Limited concurrency
}
private Mono<String> processItem(String item) {
return Mono.fromCallable(() -> {
// Simulate processing
Thread.sleep(1000 + (long) (Math.random() * 2000));
return "Processed: " + item;
}).subscribeOn(Schedulers.boundedElastic());
}
private Mono<String> handleSlowItem(String item, Throwable error) {
System.err.println("Item processing too slow: " + item);
return Mono.just("Fallback for: " + item);
}
private Mono<String> getFastFallback() {
return Mono.just("Fast fallback response");
}
private Mono<String> getEmergencyFallback() {
return Mono.just("Emergency fallback response");
}
}
3. Error Classification and Handling
public class ErrorClassification {
public Mono<String> classifyAndHandleErrors(Mono<String> operation) {
return operation
.onErrorResume(throwable -> {
ErrorCategory category = classifyError(throwable);
return handleByCategory(category, throwable);
});
}
private ErrorCategory classifyError(Throwable throwable) {
if (throwable instanceof TimeoutException) {
return ErrorCategory.TIMEOUT;
} else if (throwable instanceof IOException) {
return ErrorCategory.NETWORK;
} else if (throwable instanceof IllegalArgumentException) {
return ErrorCategory.VALIDATION;
} else if (throwable instanceof RuntimeException) {
return ErrorCategory.BUSINESS;
} else {
return ErrorCategory.UNKNOWN;
}
}
private Mono<String> handleByCategory(ErrorCategory category, Throwable error) {
switch (category) {
case TIMEOUT:
return Mono.just("Timeout handled - using cached data");
case NETWORK:
return retryWithBackoff(error);
case VALIDATION:
return Mono.just("Validation error - skipping operation");
case BUSINESS:
return Mono.error(new BusinessException("Business logic error", error));
default:
return Mono.error(new UnhandledErrorException("Unhandled error", error));
}
}
private Mono<String> retryWithBackoff(Throwable error) {
return Mono.error(error) // Re-throw to enable retry
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
}
enum ErrorCategory {
TIMEOUT, NETWORK, VALIDATION, BUSINESS, UNKNOWN
}
public static class BusinessException extends RuntimeException {
public BusinessException(String message, Throwable cause) {
super(message, cause);
}
}
public static class UnhandledErrorException extends RuntimeException {
public UnhandledErrorException(String message, Throwable cause) {
super(message, cause);
}
}
}
Best Practices
1. Error Handling Strategy
public class ErrorHandlingBestPractices {
// 1. Use specific exception handling
public Mono<String> specificErrorHandling() {
return externalService.call()
.onErrorResume(TimeoutException.class, e -> handleTimeout(e))
.onErrorResume(IOException.class, e -> handleIOError(e))
.onErrorResume(RuntimeException.class, e -> handleRuntimeError(e));
}
// 2. Log errors appropriately
public Flux<String> withProperLogging(Flux<String> stream) {
return stream
.doOnError(error -> {
if (error instanceof TimeoutException) {
log.warn("Timeout occurred", error);
} else {
log.error("Unexpected error", error);
}
})
.onErrorContinue((error, element) ->
log.debug("Skipped element due to error: {}", element, error));
}
// 3. Monitor and metrics
public Mono<String> withMonitoring(Mono<String> operation) {
return operation
.doOnSubscribe(s -> metrics.increment("operations.started"))
.doOnSuccess(r -> metrics.increment("operations.successful"))
.doOnError(e -> {
metrics.increment("operations.failed");
metrics.recordTimer("operation.duration", Duration.between(startTime, Instant.now()));
});
}
// 4. Resource cleanup
public Flux<String> withResourceCleanup(Flux<String> stream) {
return stream
.usingWhen(
Mono.fromCallable(() -> acquireResource()),
resource -> processWithResource(resource),
resource -> cleanupResource(resource)
)
.onErrorResume(e -> {
log.error("Resource processing failed", e);
return Flux.empty();
});
}
private String acquireResource() { return "resource"; }
private Flux<String> processWithResource(String resource) {
return Flux.just("processed");
}
private Mono<Void> cleanupResource(String resource) {
return Mono.empty();
}
private Mono<String> handleTimeout(TimeoutException e) {
return Mono.just("timeout-handled");
}
private Mono<String> handleIOError(IOException e) {
return Mono.just("io-error-handled");
}
private Mono<String> handleRuntimeError(RuntimeException e) {
return Mono.just("runtime-error-handled");
}
}
Common Pitfalls and Solutions
public class CommonPitfalls {
// 1. Don't forget to subscribe
public void forgottenSubscription() {
Mono<String> result = externalCall()
.onErrorReturn("fallback");
// Missing: result.subscribe();
}
// 2. Avoid blocking in reactive chains
public Mono<String> avoidBlockingCalls() {
// Wrong:
// return Mono.fromCallable(() -> blockingHttpCall());
// Correct:
return Mono.fromCallable(() -> blockingHttpCall())
.subscribeOn(Schedulers.boundedElastic());
}
// 3. Handle errors in flatMap
public Flux<String> handleErrorsInFlatMap(Flux<String> input) {
return input
.flatMap(item ->
processItem(item)
.onErrorResume(error -> handleItemError(item, error))
);
}
// 4. Be careful with onErrorContinue
public Flux<String> carefulWithOnErrorContinue(Flux<String> input) {
return input
.map(this::transform) // This might throw
.onErrorContinue((error, element) ->
log.error("Failed to process: {}", element, error));
// Note: onErrorContinue only works with operators that support it
}
private String transform(String input) {
if (input.equals("invalid")) {
throw new IllegalArgumentException("Invalid input");
}
return input.toUpperCase();
}
private Mono<String> processItem(String item) {
return Mono.just("processed-" + item);
}
private Mono<String> handleItemError(String item, Throwable error) {
return Mono.just("error-handled-" + item);
}
private String blockingHttpCall() {
return "result";
}
private Mono<String> externalCall() {
return Mono.just("response");
}
}
Effective error handling in reactive pipelines requires understanding the propagation mechanisms and using the appropriate operators for each scenario. The key is to design for resilience while maintaining the reactive nature of the application.