Handling Exceptions in CompletableFuture in Java

CompletableFuture provides several ways to handle exceptions gracefully. Here's a comprehensive guide:

Basic Exception Handling Methods

1. exceptionally() - Basic Exception Recovery

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class BasicExceptionHandling {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Something went wrong!");
}
return "Success!";
});
// Handle exceptions with exceptionally()
CompletableFuture<String> safeFuture = future.exceptionally(ex -> {
System.out.println("Exception caught: " + ex.getMessage());
return "Default Value";
});
System.out.println("Result: " + safeFuture.get());
}
}

2. handle() - Handle Both Success and Exception

public class HandleMethodExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int number = (int) (Math.random() * 10);
if (number % 2 == 0) {
throw new RuntimeException("Even numbers are not allowed: " + number);
}
return number;
});
CompletableFuture<String> handled = future.handle((result, ex) -> {
if (ex != null) {
return "Error: " + ex.getMessage();
} else {
return "Success: " + result;
}
});
System.out.println(handled.get());
}
}

3. whenComplete() - Side Effects Without Recovery

public class WhenCompleteExample {
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int num = Integer.parseInt("NOT_A_NUMBER"); // This will throw NumberFormatException
return num * 2;
});
CompletableFuture<Void> logged = future.whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("Task failed with: " + ex.getClass().getSimpleName());
// Can't recover here, just log
} else {
System.out.println("Task completed with: " + result);
}
});
try {
logged.get();
} catch (Exception e) {
System.out.println("Exception still propagates: " + e.getCause());
}
}
}

Advanced Exception Handling Patterns

4. Chaining Exception Handling

public class ChainedExceptionHandling {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> pipeline = CompletableFuture
.supplyAsync(() -> {
System.out.println("Step 1: Data extraction");
if (Math.random() > 0.7) {
throw new RuntimeException("Extraction failed");
}
return "Raw Data";
})
.thenApplyAsync(data -> {
System.out.println("Step 2: Data processing - " + data);
if (Math.random() > 0.7) {
throw new RuntimeException("Processing failed");
}
return data.toUpperCase();
})
.thenApplyAsync(data -> {
System.out.println("Step 3: Data transformation - " + data);
if (Math.random() > 0.7) {
throw new RuntimeException("Transformation failed");
}
return data + " - TRANSFORMED";
})
.exceptionally(ex -> {
System.err.println("Pipeline failed at some stage: " + ex.getMessage());
return "Fallback Result";
})
.thenApply(result -> {
System.out.println("Final step with: " + result);
return result + " - COMPLETED";
});
System.out.println("Final result: " + pipeline.get());
}
}

5. Retry Mechanism with CompletableFuture

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
public class RetryMechanism {
public static <T> CompletableFuture<T> withRetry(
Supplier<CompletableFuture<T>> task, 
int maxRetries, 
long delayMs) {
AtomicInteger retries = new AtomicInteger(0);
CompletableFuture<T> result = new CompletableFuture<>();
retry(task, maxRetries, delayMs, retries, result);
return result;
}
private static <T> void retry(
Supplier<CompletableFuture<T>> task,
int maxRetries,
long delayMs,
AtomicInteger retries,
CompletableFuture<T> result) {
task.get().whenComplete((success, error) -> {
if (error != null) {
int currentRetry = retries.incrementAndGet();
if (currentRetry <= maxRetries) {
System.out.println("Retry attempt " + currentRetry + " after error: " + error.getMessage());
CompletableFuture.delayedExecutor(delayMs, TimeUnit.MILLISECONDS)
.execute(() -> retry(task, maxRetries, delayMs, retries, result));
} else {
result.completeExceptionally(error);
}
} else {
result.complete(success);
}
});
}
public static void main(String[] args) throws Exception {
AtomicInteger attempt = new AtomicInteger(0);
CompletableFuture<String> retryFuture = withRetry(() -> {
return CompletableFuture.supplyAsync(() -> {
int currentAttempt = attempt.incrementAndGet();
if (currentAttempt < 3) {
throw new RuntimeException("Simulated failure on attempt " + currentAttempt);
}
return "Success on attempt " + currentAttempt;
});
}, 3, 1000); // 3 retries, 1 second delay
System.out.println("Final result: " + retryFuture.get());
}
}

6. Timeout Handling

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class TimeoutHandling {
private static final ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(1);
public static <T> CompletableFuture<T> withTimeout(
CompletableFuture<T> future, 
long timeout, 
TimeUnit unit) {
return future.orTimeout(timeout, unit)
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
System.err.println("Task timed out after " + timeout + " " + unit);
// Return default value or throw custom exception
return null; // or throw new CustomTimeoutException();
}
throw new CompletionException(ex);
});
}
// Alternative manual timeout implementation
public static <T> CompletableFuture<T> withTimeoutManual(
CompletableFuture<T> future, 
long timeout, 
TimeUnit unit,
T defaultValue) {
CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
scheduler.schedule(() -> {
if (!future.isDone()) {
timeoutFuture.complete(defaultValue);
future.cancel(true);
}
}, timeout, unit);
return future.applyToEither(timeoutFuture, Function.identity())
.exceptionally(ex -> {
if (ex instanceof CancellationException) {
System.out.println("Task was cancelled due to timeout");
return defaultValue;
}
throw new CompletionException(ex);
});
}
public static void main(String[] args) throws Exception {
CompletableFuture<String> slowTask = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000); // Simulate slow task
return "Slow Result";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// Using orTimeout (Java 9+)
CompletableFuture<String> withTimeout = withTimeout(slowTask, 2, TimeUnit.SECONDS);
try {
System.out.println("Result: " + withTimeout.get());
} catch (ExecutionException e) {
System.out.println("Exception: " + e.getCause());
}
}
}

Real-World Examples

7. API Call with Fallback

import java.util.concurrent.CompletableFuture;
public class ApiCallWithFallback {
public CompletableFuture<String> callPrimaryApi(String request) {
return CompletableFuture.supplyAsync(() -> {
// Simulate API call
if (Math.random() > 0.3) {
throw new RuntimeException("Primary API unavailable");
}
return "Response from Primary API for: " + request;
});
}
public CompletableFuture<String> callFallbackApi(String request) {
return CompletableFuture.supplyAsync(() -> {
// Simulate fallback API call
if (Math.random() > 0.2) {
throw new RuntimeException("Fallback API also unavailable");
}
return "Response from Fallback API for: " + request;
});
}
public CompletableFuture<String> callApiWithFallback(String request) {
return callPrimaryApi(request)
.exceptionallyCompose(primaryEx -> {
System.out.println("Primary API failed: " + primaryEx.getMessage());
System.out.println("Trying fallback API...");
return callFallbackApi(request);
})
.exceptionally(ex -> {
System.err.println("All APIs failed: " + ex.getMessage());
return "Default Response";
});
}
public static void main(String[] args) throws Exception {
ApiCallWithFallback client = new ApiCallWithFallback();
CompletableFuture<String> response = client.callApiWithFallback("test-request");
System.out.println("Final response: " + response.get());
}
}

8. Bulk Operations with Individual Exception Handling

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class BulkOperations {
public static List<CompletableFuture<String>> processItems(List<String> items) {
return items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> processItem(item))
.exceptionally(ex -> "Failed for " + item + ": " + ex.getMessage()))
.collect(Collectors.toList());
}
private static String processItem(String item) {
if (item.startsWith("fail")) {
throw new RuntimeException("Intentional failure for: " + item);
}
try {
Thread.sleep(100); // Simulate work
return item.toUpperCase();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public static CompletableFuture<Void> processAllWithIndividualHandling(List<String> items) {
List<CompletableFuture<String>> futures = processItems(items);
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
).thenApply(v -> {
List<String> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println("All processing completed:");
results.forEach(System.out::println);
return null;
});
}
// Continue processing despite some failures
public static CompletableFuture<List<String>> processWithTolerance(List<String> items) {
List<CompletableFuture<Optional<String>>> futures = items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> {
try {
return Optional.of(processItem(item));
} catch (Exception e) {
System.err.println("Item failed: " + item + " - " + e.getMessage());
return Optional.empty();
}
}))
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList()));
}
public static void main(String[] args) throws Exception {
List<String> items = Arrays.asList("apple", "banana", "fail1", "cherry", "fail2");
processAllWithIndividualHandling(items).get();
CompletableFuture<List<String>> tolerant = processWithTolerance(items);
System.out.println("Successful results: " + tolerant.get());
}
}

9. Composition with Exception Propagation

public class CompositionWithExceptions {
public static CompletableFuture<Integer> parseNumber(String input) {
return CompletableFuture.supplyAsync(() -> {
try {
return Integer.parseInt(input);
} catch (NumberFormatException e) {
throw new RuntimeException("Invalid number: " + input, e);
}
});
}
public static CompletableFuture<Integer> doubleNumber(int number) {
return CompletableFuture.supplyAsync(() -> {
if (number > 1000) {
throw new RuntimeException("Number too large: " + number);
}
return number * 2;
});
}
public static CompletableFuture<String> processResult(int number) {
return CompletableFuture.supplyAsync(() -> {
return "Result: " + number;
});
}
public static CompletableFuture<String> processNumber(String input) {
return parseNumber(input)
.thenCompose(CompositionWithExceptions::doubleNumber)
.thenCompose(CompositionWithExceptions::processResult)
.exceptionally(ex -> {
// Handle any exception in the chain
if (ex instanceof RuntimeException) {
return "Processing failed: " + ex.getMessage();
}
return "Unexpected error: " + ex.getMessage();
});
}
// Using handle() at each stage for fine-grained control
public static CompletableFuture<String> processNumberWithStagedHandling(String input) {
return parseNumber(input)
.handle((num, ex) -> {
if (ex != null) {
System.err.println("Parsing failed");
throw new CompletionException(ex);
}
return num;
})
.thenCompose(CompositionWithExceptions::doubleNumber)
.handle((num, ex) -> {
if (ex != null) {
System.err.println("Doubling failed");
return -1; // Recover with default value
}
return num;
})
.thenCompose(CompositionWithExceptions::processResult);
}
public static void main(String[] args) throws Exception {
// Test valid input
CompletableFuture<String> result1 = processNumber("42");
System.out.println(result1.get());
// Test invalid input
CompletableFuture<String> result2 = processNumber("abc");
System.out.println(result2.get());
// Test large number
CompletableFuture<String> result3 = processNumber("2000");
System.out.println(result3.get());
}
}

Best Practices

  1. Always handle exceptions - Don't let them propagate silently
  2. Use exceptionally() for recovery - When you want to provide a fallback value
  3. Use handle() for inspection - When you need to examine both result and exception
  4. Use whenComplete() for side effects - When you need to log but not recover
  5. Chain exception handling appropriately - Place exceptionally() where you want recovery to happen
  6. Consider timeouts - Always set reasonable timeouts for async operations
  7. Log exceptions appropriately - Use proper logging in exception handlers

These patterns will help you build robust asynchronous applications with proper error handling and recovery mechanisms.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper