CompletableFuture for Async Programming in Java: Complete Guide

CompletableFuture is a powerful class introduced in Java 8 that represents a future result of an asynchronous computation. It provides a rich API for composing, combining, and executing asynchronous operations.


1. CompletableFuture Fundamentals

Key Concepts:

  • Asynchronous Execution: Non-blocking operation execution
  • Composition: Chain multiple async operations
  • Combination: Combine results from multiple futures
  • Exception Handling: Comprehensive error handling
  • Completion Control: Manual completion control

Advantages over Future:

  • Non-blocking operations
  • Functional programming style
  • Exception handling built-in
  • Manual completion control
  • Rich API for composition

2. Creating CompletableFuture

import java.util.concurrent.*;
import java.util.*;
public class CompletableFutureCreation {
public static void main(String[] args) throws Exception {
runAsyncExample();
supplyAsyncExample();
completedFutureExample();
manualCompletionExample();
}
// Creating CompletableFuture with runAsync (no result)
public static void runAsyncExample() throws Exception {
System.out.println("=== runAsync Example ===");
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Running async task in: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Async task completed");
});
// Wait for completion
future.get();
System.out.println("Main thread continues...\n");
}
// Creating CompletableFuture with supplyAsync (with result)
public static void supplyAsyncExample() throws Exception {
System.out.println("=== supplyAsync Example ===");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Supplying result in: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Hello from async task!";
});
// Get the result (blocks until available)
String result = future.get();
System.out.println("Result: " + result + "\n");
}
// Creating already completed CompletableFuture
public static void completedFutureExample() {
System.out.println("=== completedFuture Example ===");
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("Immediate result");
System.out.println("Is done: " + completedFuture.isDone());
System.out.println("Result: " + completedFuture.join() + "\n");
}
// Manual completion control
public static void manualCompletionExample() throws Exception {
System.out.println("=== Manual Completion Example ===");
CompletableFuture<String> future = new CompletableFuture<>();
// Start a thread that will complete the future
new Thread(() -> {
try {
Thread.sleep(2000);
// Manually complete the future
future.complete("Manually completed result");
} catch (InterruptedException e) {
future.completeExceptionally(e);
}
}).start();
// Another thread could complete it exceptionally
new Thread(() -> {
try {
Thread.sleep(1000);
// This won't work since it's already completed by first thread
future.completeExceptionally(new RuntimeException("Too late!"));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
System.out.println("Result: " + future.get() + "\n");
}
// Using custom Executor
public static void customExecutorExample() throws Exception {
System.out.println("=== Custom Executor Example ===");
ExecutorService customExecutor = Executors.newFixedThreadPool(2, r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CustomThread-" + System.currentTimeMillis());
return t;
});
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Running in: " + Thread.currentThread().getName());
return "Result from custom executor";
}, customExecutor);
System.out.println("Result: " + future.get());
customExecutor.shutdown();
}
}

3. Transformation and Chaining

import java.util.concurrent.*;
import java.util.function.*;
public class CompletableFutureTransformation {
public static void main(String[] args) throws Exception {
thenApplyExample();
thenAcceptExample();
thenRunExample();
thenComposeExample();
handleExample();
exceptionallyExample();
}
// Transform result with thenApply
public static void thenApplyExample() throws Exception {
System.out.println("=== thenApply (Transformation) ===");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return "hello";
});
// Transform the result
CompletableFuture<String> transformed = future.thenApply(String::toUpperCase)
.thenApply(s -> s + " world!")
.thenApply(s -> s + " 🎉");
System.out.println("Transformed result: " + transformed.get() + "\n");
}
// Consume result with thenAccept
public static void thenAcceptExample() throws Exception {
System.out.println("=== thenAccept (Consumption) ===");
CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return "Processing completed";
}).thenAccept(result -> {
System.out.println("Consuming result: " + result);
}).get(); // Wait for completion
System.out.println();
}
// Run action after completion with thenRun
public static void thenRunExample() throws Exception {
System.out.println("=== thenRun (Action) ===");
CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return "Some result";
}).thenRun(() -> {
System.out.println("Cleanup action executed after completion");
}).get();
System.out.println();
}
// Chain dependent async operations with thenCompose
public static void thenComposeExample() throws Exception {
System.out.println("=== thenCompose (Chaining) ===");
// Simulate user ID lookup then user details fetch
CompletableFuture<String> userDetails = getUserById(123)
.thenCompose(userId -> getUserDetails(userId));
System.out.println("User details: " + userDetails.get() + "\n");
}
// Handle both result and exception with handle
public static void handleExample() throws Exception {
System.out.println("=== handle (Result & Exception) ===");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Simulated error");
}
return 42;
});
CompletableFuture<String> handled = future.handle((result, exception) -> {
if (exception != null) {
return "Recovered from: " + exception.getMessage();
} else {
return "Success: " + result;
}
});
System.out.println("Handled result: " + handled.get() + "\n");
}
// Handle exceptions with exceptionally
public static void exceptionallyExample() throws Exception {
System.out.println("=== exceptionally (Error Recovery) ===");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.7) {
throw new RuntimeException("Database connection failed");
}
return "Data from database";
});
CompletableFuture<String> recovered = future.exceptionally(throwable -> {
System.err.println("Error occurred: " + throwable.getMessage());
return "Default data";
});
System.out.println("Final result: " + recovered.get() + "\n");
}
// Helper methods for thenCompose example
private static CompletableFuture<Integer> getUserById(int id) {
return CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(300); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
System.out.println("Retrieved user ID: " + id);
return id;
});
}
private static CompletableFuture<String> getUserDetails(int userId) {
return CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(400); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return "Details for user " + userId;
});
}
}

4. Combining Multiple Futures

import java.util.concurrent.*;
import java.util.*;
public class CompletableFutureCombination {
public static void main(String[] args) throws Exception {
thenCombineExample();
thenAcceptBothExample();
runAfterBothExample();
allOfExample();
anyOfExample();
}
// Combine results from two independent futures
public static void thenCombineExample() throws Exception {
System.out.println("=== thenCombine (Combine Results) ===");
CompletableFuture<String> userFuture = getUserName(123);
CompletableFuture<Integer> ageFuture = getUserAge(123);
CompletableFuture<String> combined = userFuture.thenCombine(ageFuture, 
(name, age) -> name + " is " + age + " years old");
System.out.println("Combined result: " + combined.get() + "\n");
}
// Consume results from two futures
public static void thenAcceptBothExample() throws Exception {
System.out.println("=== thenAcceptBoth (Consume Both) ===");
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "Task1 result");
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "Task2 result");
task1.thenAcceptBoth(task2, (result1, result2) -> {
System.out.println("Both completed: " + result1 + ", " + result2);
}).get();
System.out.println();
}
// Run action after both complete (without results)
public static void runAfterBothExample() throws Exception {
System.out.println("=== runAfterBoth (Action After Both) ===");
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return "Task1 done";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(300); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return "Task2 done";
});
task1.runAfterBoth(task2, () -> {
System.out.println("Both tasks completed! Starting cleanup...");
}).get();
System.out.println();
}
// Wait for all futures to complete
public static void allOfExample() throws Exception {
System.out.println("=== allOf (Wait for All) ===");
List<CompletableFuture<String>> futures = Arrays.asList(
CompletableFuture.supplyAsync(() -> { sleep(1000); return "Result 1"; }),
CompletableFuture.supplyAsync(() -> { sleep(800); return "Result 2"; }),
CompletableFuture.supplyAsync(() -> { sleep(1200); return "Result 3"; }),
CompletableFuture.supplyAsync(() -> { sleep(600); return "Result 4"; })
);
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
// Wait for all to complete, then process results
CompletableFuture<List<String>> allResults = allFutures.thenApply(v -> 
futures.stream()
.map(CompletableFuture::join)
.toList()
);
List<String> results = allResults.get();
System.out.println("All results: " + results + "\n");
}
// Wait for any future to complete
public static void anyOfExample() throws Exception {
System.out.println("=== anyOf (Wait for Any) ===");
CompletableFuture<String> fastTask = CompletableFuture.supplyAsync(() -> {
sleep(500);
return "Fast task result";
});
CompletableFuture<String> mediumTask = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "Medium task result";
});
CompletableFuture<String> slowTask = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "Slow task result";
});
CompletableFuture<Object> firstCompleted = CompletableFuture.anyOf(
fastTask, mediumTask, slowTask
);
String firstResult = (String) firstCompleted.get();
System.out.println("First completed: " + firstResult + "\n");
}
// Helper methods
private static CompletableFuture<String> getUserName(int userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(300);
return "User" + userId;
});
}
private static CompletableFuture<Integer> getUserAge(int userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(400);
return 25 + (userId % 10);
});
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

5. Real-World Examples

Example 1: E-commerce Order Processing

import java.util.concurrent.*;
import java.util.*;
public class ECommerceOrderProcessing {
public static class OrderService {
private final ExecutorService executor = Executors.newFixedThreadPool(5);
public CompletableFuture<OrderResult> processOrder(Order order) {
System.out.println("🎯 Processing order: " + order.getId());
// Execute all steps in parallel where possible
CompletableFuture<Customer> customerFuture = getCustomerInfo(order.getCustomerId());
CompletableFuture<Inventory> inventoryFuture = checkInventory(order.getItems());
CompletableFuture<Payment> paymentFuture = processPayment(order);
// Combine results and process order
return CompletableFuture.allOf(customerFuture, inventoryFuture, paymentFuture)
.thenCompose(v -> {
try {
Customer customer = customerFuture.get();
Inventory inventory = inventoryFuture.get();
Payment payment = paymentFuture.get();
return validateAndCreateOrder(customer, inventory, payment, order);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
})
.exceptionally(throwable -> {
System.err.println("Order processing failed: " + throwable.getMessage());
return new OrderResult(order.getId(), "FAILED", 
"Error: " + throwable.getMessage());
});
}
private CompletableFuture<Customer> getCustomerInfo(String customerId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("👤 Fetching customer: " + customerId);
sleep(200);
if (customerId.startsWith("INVALID")) {
throw new RuntimeException("Invalid customer ID: " + customerId);
}
return new Customer(customerId, "Customer-" + customerId, 
"customer" + customerId + "@example.com");
}, executor);
}
private CompletableFuture<Inventory> checkInventory(List<String> items) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("📦 Checking inventory for " + items.size() + " items");
sleep(300);
boolean allAvailable = items.stream()
.allMatch(item -> !item.contains("OUT_OF_STOCK"));
if (!allAvailable) {
throw new RuntimeException("Some items are out of stock");
}
return new Inventory(items, true, "All items available");
}, executor);
}
private CompletableFuture<Payment> processPayment(Order order) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("💳 Processing payment for order: " + order.getId());
sleep(400);
if (order.getAmount() > 1000) {
throw new RuntimeException("Payment declined: Amount too high");
}
return new Payment("PAY-" + System.currentTimeMillis(), 
order.getAmount(), "COMPLETED");
}, executor);
}
private CompletableFuture<OrderResult> validateAndCreateOrder(
Customer customer, Inventory inventory, Payment payment, Order order) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("✅ Creating order: " + order.getId());
sleep(200);
// Simulate order creation
return new OrderResult(order.getId(), "SUCCESS", 
String.format("Order created for %s. Payment: %s. Items: %d",
customer.getName(), payment.getStatus(), order.getItems().size()));
}, executor);
}
public void shutdown() {
executor.shutdown();
}
}
// Domain classes
static class Order {
private final String id;
private final String customerId;
private final List<String> items;
private final double amount;
public Order(String id, String customerId, List<String> items, double amount) {
this.id = id;
this.customerId = customerId;
this.items = items;
this.amount = amount;
}
public String getId() { return id; }
public String getCustomerId() { return customerId; }
public List<String> getItems() { return items; }
public double getAmount() { return amount; }
}
static class Customer {
private final String id;
private final String name;
private final String email;
public Customer(String id, String name, String email) {
this.id = id;
this.name = name;
this.email = email;
}
public String getName() { return name; }
}
static class Inventory {
private final List<String> items;
private final boolean available;
private final String message;
public Inventory(List<String> items, boolean available, String message) {
this.items = items;
this.available = available;
this.message = message;
}
}
static class Payment {
private final String id;
private final double amount;
private final String status;
public Payment(String id, double amount, String status) {
this.id = id;
this.amount = amount;
this.status = status;
}
public String getStatus() { return status; }
}
static class OrderResult {
private final String orderId;
private final String status;
private final String message;
public OrderResult(String orderId, String status, String message) {
this.orderId = orderId;
this.status = status;
this.message = message;
}
@Override
public String toString() {
return String.format("OrderResult[orderId=%s, status=%s, message=%s]", 
orderId, status, message);
}
}
public static void main(String[] args) throws Exception {
OrderService service = new OrderService();
// Test successful order
Order successfulOrder = new Order("ORDER-001", "CUST-123", 
Arrays.asList("Laptop", "Mouse", "Keyboard"), 899.99);
CompletableFuture<OrderResult> result1 = service.processOrder(successfulOrder);
System.out.println("Result 1: " + result1.get() + "\n");
// Test failed order (out of stock)
Order failedOrder1 = new Order("ORDER-002", "CUST-456", 
Arrays.asList("Laptop", "OUT_OF_STOCK_Monitor"), 1200.00);
CompletableFuture<OrderResult> result2 = service.processOrder(failedOrder1);
System.out.println("Result 2: " + result2.get() + "\n");
// Test failed order (payment declined)
Order failedOrder2 = new Order("ORDER-003", "CUST-789", 
Arrays.asList("Gaming PC", "Monitor"), 1500.00);
CompletableFuture<OrderResult> result3 = service.processOrder(failedOrder2);
System.out.println("Result 3: " + result3.get() + "\n");
service.shutdown();
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

Example 2: API Aggregation Service

import java.util.concurrent.*;
import java.util.*;
import java.util.stream.*;
public class APIAggregationService {
public static class AggregationService {
private final ExecutorService executor = Executors.newCachedThreadPool();
public CompletableFuture<AggregatedData> aggregateUserData(String userId) {
System.out.println("🔍 Aggregating data for user: " + userId);
// Fetch data from multiple services in parallel
CompletableFuture<UserProfile> profileFuture = fetchUserProfile(userId);
CompletableFuture<List<Order>> ordersFuture = fetchUserOrders(userId);
CompletableFuture<List<Payment>> paymentsFuture = fetchUserPayments(userId);
CompletableFuture<UserPreferences> preferencesFuture = fetchUserPreferences(userId);
// Combine all results
return CompletableFuture.allOf(profileFuture, ordersFuture, paymentsFuture, preferencesFuture)
.thenApply(v -> {
try {
UserProfile profile = profileFuture.join();
List<Order> orders = ordersFuture.join();
List<Payment> payments = paymentsFuture.join();
UserPreferences preferences = preferencesFuture.join();
return new AggregatedData(profile, orders, payments, preferences);
} catch (CompletionException e) {
throw new RuntimeException("Failed to aggregate data", e);
}
})
.exceptionally(throwable -> {
System.err.println("Aggregation failed: " + throwable.getMessage());
// Return partial data or default values
return new AggregatedData(
new UserProfile(userId, "Unknown", "[email protected]"),
Collections.emptyList(),
Collections.emptyList(),
new UserPreferences(Collections.emptyMap())
);
});
}
public CompletableFuture<Map<String, AggregatedData>> aggregateMultipleUsers(List<String> userIds) {
System.out.println("👥 Aggregating data for " + userIds.size() + " users");
// Process all users in parallel
List<CompletableFuture<AggregatedData>> futures = userIds.stream()
.map(this::aggregateUserData)
.toList();
// Convert to map when all complete
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
Map<String, AggregatedData> result = new HashMap<>();
for (int i = 0; i < userIds.size(); i++) {
result.put(userIds.get(i), futures.get(i).join());
}
return result;
});
}
private CompletableFuture<UserProfile> fetchUserProfile(String userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(200 + new Random().nextInt(300));
System.out.println("📇 Fetched profile for: " + userId);
return new UserProfile(userId, "User-" + userId, userId + "@example.com");
}, executor);
}
private CompletableFuture<List<Order>> fetchUserOrders(String userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(300 + new Random().nextInt(400));
int orderCount = 2 + new Random().nextInt(5);
List<Order> orders = IntStream.range(1, orderCount + 1)
.mapToObj(i -> new Order("ORDER-" + userId + "-" + i, 
50.0 + new Random().nextDouble() * 200))
.toList();
System.out.println("📦 Fetched " + orders.size() + " orders for: " + userId);
return orders;
}, executor);
}
private CompletableFuture<List<Payment>> fetchUserPayments(String userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(250 + new Random().nextInt(350));
int paymentCount = 1 + new Random().nextInt(3);
List<Payment> payments = IntStream.range(1, paymentCount + 1)
.mapToObj(i -> new Payment("PAY-" + userId + "-" + i, 
30.0 + new Random().nextDouble() * 150))
.toList();
System.out.println("💳 Fetched " + payments.size() + " payments for: " + userId);
return payments;
}, executor);
}
private CompletableFuture<UserPreferences> fetchUserPreferences(String userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(150 + new Random().nextInt(250));
Map<String, String> prefs = Map.of(
"theme", "dark",
"language", "en",
"notifications", "enabled"
);
System.out.println("⚙️  Fetched preferences for: " + userId);
return new UserPreferences(prefs);
}, executor);
}
public void shutdown() {
executor.shutdown();
}
}
// Domain classes
static class UserProfile {
private final String userId;
private final String name;
private final String email;
public UserProfile(String userId, String name, String email) {
this.userId = userId;
this.name = name;
this.email = email;
}
@Override
public String toString() {
return String.format("UserProfile[userId=%s, name=%s]", userId, name);
}
}
static class Order {
private final String orderId;
private final double amount;
public Order(String orderId, double amount) {
this.orderId = orderId;
this.amount = amount;
}
@Override
public String toString() {
return String.format("Order[orderId=%s, amount=%.2f]", orderId, amount);
}
}
static class Payment {
private final String paymentId;
private final double amount;
public Payment(String paymentId, double amount) {
this.paymentId = paymentId;
this.amount = amount;
}
@Override
public String toString() {
return String.format("Payment[paymentId=%s, amount=%.2f]", paymentId, amount);
}
}
static class UserPreferences {
private final Map<String, String> preferences;
public UserPreferences(Map<String, String> preferences) {
this.preferences = preferences;
}
@Override
public String toString() {
return String.format("UserPreferences[prefs=%s]", preferences);
}
}
static class AggregatedData {
private final UserProfile profile;
private final List<Order> orders;
private final List<Payment> payments;
private final UserPreferences preferences;
public AggregatedData(UserProfile profile, List<Order> orders, 
List<Payment> payments, UserPreferences preferences) {
this.profile = profile;
this.orders = orders;
this.payments = payments;
this.preferences = preferences;
}
@Override
public String toString() {
return String.format(
"AggregatedData[profile=%s, orders=%d, payments=%d, preferences=%s]",
profile, orders.size(), payments.size(), preferences
);
}
}
public static void main(String[] args) throws Exception {
AggregationService service = new AggregationService();
// Single user aggregation
System.out.println("=== Single User Aggregation ===");
CompletableFuture<AggregatedData> singleUserData = service.aggregateUserData("USER-001");
System.out.println("Single user result: " + singleUserData.get() + "\n");
// Multiple users aggregation
System.out.println("=== Multiple Users Aggregation ===");
List<String> userIds = Arrays.asList("USER-001", "USER-002", "USER-003", "USER-004");
CompletableFuture<Map<String, AggregatedData>> multipleUsersData = 
service.aggregateMultipleUsers(userIds);
Map<String, AggregatedData> results = multipleUsersData.get();
results.forEach((userId, data) -> {
System.out.println(userId + ": " + data);
});
service.shutdown();
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

6. Advanced Patterns and Timeouts

import java.util.concurrent.*;
import java.util.*;
public class AdvancedCompletableFuturePatterns {
public static void main(String[] args) throws Exception {
timeoutPatternExample();
retryPatternExample();
fallbackPatternExample();
cancellationPatternExample();
}
// Timeout pattern
public static void timeoutPatternExample() throws Exception {
System.out.println("=== Timeout Pattern ===");
CompletableFuture<String> slowTask = CompletableFuture.supplyAsync(() -> {
sleep(3000); // This takes 3 seconds
return "Slow result";
});
CompletableFuture<String> timeoutTask = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "Timeout reached";
});
// Complete with timeout result if slow task doesn't complete in time
CompletableFuture<String> result = slowTask.applyToEither(timeoutTask, Function.identity())
.orTimeout(2, TimeUnit.SECONDS) // Java 9+ feature
.exceptionally(throwable -> "Fallback due to timeout: " + throwable.getMessage());
System.out.println("Result: " + result.get() + "\n");
}
// Retry pattern
public static void retryPatternExample() throws Exception {
System.out.println("=== Retry Pattern ===");
CompletableFuture<String> retryableTask = retryAsync(() -> {
double random = Math.random();
if (random < 0.7) { // 70% chance of failure
throw new RuntimeException("Temporary failure");
}
return "Success after retries";
}, 3, 1000); // Retry 3 times with 1 second delay
System.out.println("Retry result: " + retryableTask.get() + "\n");
}
// Fallback pattern
public static void fallbackPatternExample() throws Exception {
System.out.println("=== Fallback Pattern ===");
CompletableFuture<String> primaryService = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Primary service unavailable");
}
return "Data from primary service";
});
CompletableFuture<String> fallbackService = CompletableFuture.supplyAsync(() -> {
sleep(500);
return "Data from fallback service";
});
CompletableFuture<String> result = primaryService
.exceptionallyCompose(throwable -> {
System.out.println("Primary failed, using fallback: " + throwable.getMessage());
return fallbackService;
});
System.out.println("Final result: " + result.get() + "\n");
}
// Cancellation pattern
public static void cancellationPatternExample() throws Exception {
System.out.println("=== Cancellation Pattern ===");
CompletableFuture<String> cancellableFuture = new CompletableFuture<>();
// Start async work
Thread workerThread = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("Task cancelled at step " + i);
return;
}
System.out.println("Working... step " + i);
sleep(500);
}
cancellableFuture.complete("Task completed successfully");
} catch (Exception e) {
cancellableFuture.completeExceptionally(e);
}
});
workerThread.start();
// Cancel after 2 seconds
CompletableFuture.runAsync(() -> {
sleep(2000);
workerThread.interrupt();
cancellableFuture.cancel(true);
});
try {
String result = cancellableFuture.get();
System.out.println("Result: " + result);
} catch (CancellationException e) {
System.out.println("Task was cancelled: " + e.getMessage());
}
System.out.println();
}
// Helper method for retry pattern
private static <T> CompletableFuture<T> retryAsync(Supplier<T> task, 
int maxRetries, 
long delayMs) {
return CompletableFuture.supplyAsync(task)
.exceptionallyCompose(throwable -> {
if (maxRetries > 0) {
System.out.println("Retry attempt " + (4 - maxRetries) + " failed, " + 
(maxRetries - 1) + " retries left");
return CompletableFuture.runAsync(() -> sleep(delayMs))
.thenCompose(v -> retryAsync(task, maxRetries - 1, delayMs));
} else {
throw new CompletionException("All retry attempts failed", throwable);
}
});
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

7. Best Practices and Performance

import java.util.concurrent.*;
import java.util.*;
public class CompletableFutureBestPractices {
public static class BestPracticeExamples {
// 1. Use appropriate thread pool
private final ExecutorService ioBoundExecutor = 
Executors.newCachedThreadPool(); // For I/O operations
private final ExecutorService cpuBoundExecutor = 
Executors.newWorkStealingPool(); // For CPU-intensive operations
// 2. Avoid blocking in async chains
public CompletableFuture<String> goodPractice(String input) {
return CompletableFuture.supplyAsync(() -> {
// Simulate I/O operation
sleep(100);
return input.toUpperCase();
}, ioBoundExecutor)
.thenApplyAsync(result -> {
// CPU-intensive transformation
return result + " processed";
}, cpuBoundExecutor)
.thenApplyAsync(result -> {
// Another I/O operation
sleep(50);
return result + " completed";
}, ioBoundExecutor);
}
// 3. Proper exception handling
public CompletableFuture<String> robustAsyncOperation(String input) {
return CompletableFuture.supplyAsync(() -> {
if (input == null) {
throw new IllegalArgumentException("Input cannot be null");
}
return processInput(input);
}, ioBoundExecutor)
.handle((result, throwable) -> {
if (throwable != null) {
System.err.println("Operation failed: " + throwable.getMessage());
return "Default value";
}
return result;
});
}
// 4. Resource cleanup
public CompletableFuture<Void> withResourceCleanup() {
return CompletableFuture.runAsync(() -> {
// Acquire resource
System.out.println("Acquiring resource...");
})
.whenComplete((result, throwable) -> {
// Always clean up resources
System.out.println("Cleaning up resources...");
if (throwable != null) {
System.err.println("Cleanup after failure: " + throwable.getMessage());
}
});
}
// 5. Combine multiple operations efficiently
public CompletableFuture<Map<String, Object>> combineMultipleOperations(String userId) {
CompletableFuture<String> profileFuture = fetchProfile(userId);
CompletableFuture<List<String>> ordersFuture = fetchOrders(userId);
CompletableFuture<Map<String, String>> prefsFuture = fetchPreferences(userId);
return CompletableFuture.allOf(profileFuture, ordersFuture, prefsFuture)
.thenApply(v -> {
Map<String, Object> result = new HashMap<>();
result.put("profile", profileFuture.join());
result.put("orders", ordersFuture.join());
result.put("preferences", prefsFuture.join());
return result;
});
}
// 6. Use timeouts appropriately
public CompletableFuture<String> withTimeout(String input) {
return CompletableFuture.supplyAsync(() -> {
sleep(5000); // Simulate long operation
return input;
})
.orTimeout(2, TimeUnit.SECONDS) // Java 9+
.exceptionally(throwable -> "Timeout: " + throwable.getMessage());
}
// Helper methods
private String processInput(String input) {
return "Processed: " + input;
}
private CompletableFuture<String> fetchProfile(String userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(200);
return "Profile for " + userId;
}, ioBoundExecutor);
}
private CompletableFuture<List<String>> fetchOrders(String userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(300);
return Arrays.asList("Order1", "Order2", "Order3");
}, ioBoundExecutor);
}
private CompletableFuture<Map<String, String>> fetchPreferences(String userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(150);
return Map.of("theme", "dark", "language", "en");
}, ioBoundExecutor);
}
public void shutdown() {
ioBoundExecutor.shutdown();
cpuBoundExecutor.shutdown();
}
}
public static void main(String[] args) throws Exception {
BestPracticeExamples examples = new BestPracticeExamples();
System.out.println("=== Good Practice Example ===");
CompletableFuture<String> result1 = examples.goodPractice("test input");
System.out.println("Result: " + result1.get());
System.out.println("\n=== Robust Operation Example ===");
CompletableFuture<String> result2 = examples.robustAsyncOperation(null);
System.out.println("Result: " + result2.get());
System.out.println("\n=== Combine Operations Example ===");
CompletableFuture<Map<String, Object>> result3 = examples.combineMultipleOperations("user123");
System.out.println("Combined result: " + result3.get());
System.out.println("\n=== Timeout Example ===");
CompletableFuture<String> result4 = examples.withTimeout("slow input");
System.out.println("Timeout result: " + result4.get());
examples.shutdown();
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

Conclusion

Key Benefits of CompletableFuture:

  1. Non-blocking Operations: Execute tasks without blocking threads
  2. Functional Composition: Chain operations using functional style
  3. Exception Handling: Built-in comprehensive error handling
  4. Combination: Easily combine multiple futures
  5. Manual Control: Complete futures manually when needed

Common Patterns:

  • Service Aggregation: Combine results from multiple services
  • Pipeline Processing: Chain dependent async operations
  • Error Recovery: Fallback mechanisms and retries
  • Timeout Management: Handle slow operations gracefully
  • Resource Management: Proper cleanup with whenComplete

Best Practices:

  1. Choose Right Executor: Use appropriate thread pools for I/O vs CPU-bound tasks
  2. Avoid Blocking: Never call blocking operations in async chains
  3. Handle Exceptions: Always handle exceptions in completion stages
  4. Use Timeouts: Prevent indefinite waiting with timeouts
  5. Clean Resources: Use whenComplete for resource cleanup
  6. Prefer Composition: Chain operations instead of nesting callbacks

CompletableFuture provides a powerful and flexible way to write asynchronous, non-blocking code in Java, making it essential for modern high-performance applications.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper