Chaining CompletableFuture Operations in Java

Introduction to CompletableFuture Chaining

CompletableFuture provides a powerful way to chain asynchronous operations, creating complex workflows while maintaining readability and handling errors gracefully. Chaining allows you to build pipelines where each stage executes when the previous one completes.


1. Basic Chaining Patterns

Simple ThenApply Chain

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class BasicChaining {
public static void main(String[] args) throws Exception {
// Basic thenApply chaining - synchronous transformations
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
System.out.println("Stage 1: Fetching user ID - " + Thread.currentThread().getName());
return "user123";
})
.thenApply(userId -> {
System.out.println("Stage 2: Fetching profile for " + userId + " - " + Thread.currentThread().getName());
return "Profile of " + userId;
})
.thenApply(profile -> {
System.out.println("Stage 3: Processing profile - " + Thread.currentThread().getName());
return profile.toUpperCase();
})
.thenApply(finalResult -> {
System.out.println("Stage 4: Final formatting - " + Thread.currentThread().getName());
return "RESULT: " + finalResult;
});
System.out.println("Final result: " + future.get());
}
}

Asynchronous Chaining with ThenApplyAsync

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class AsyncChaining {
public static void main(String[] args) throws Exception {
// Using thenApplyAsync for true asynchronous execution
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
System.out.println("Stage 1 - SupplyAsync: " + Thread.currentThread().getName());
sleep(100);
return "initial-data";
})
.thenApplyAsync(result -> {
System.out.println("Stage 2 - ThenApplyAsync: " + Thread.currentThread().getName());
sleep(100);
return result + "-processed";
})
.thenApplyAsync(result -> {
System.out.println("Stage 3 - ThenApplyAsync: " + Thread.currentThread().getName());
sleep(100);
return result + "-final";
}, Executors.newFixedThreadPool(2)); // Custom executor
System.out.println("Result: " + future.get());
}
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

ThenCompose for Dependent Futures

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
public class ThenComposeExample {
// Simulate service calls
public static CompletableFuture<String> getUserById(String userId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Getting user: " + userId);
sleep(200);
return "User:" + userId;
});
}
public static CompletableFuture<String> getUserProfile(String user) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Getting profile for: " + user);
sleep(200);
return user + "-Profile";
});
}
public static CompletableFuture<String> getProfileDetails(String profile) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Getting details for: " + profile);
sleep(200);
return profile + "-Details";
});
}
public static void main(String[] args) throws Exception {
// WRONG: This creates CompletableFuture<CompletableFuture<String>>
CompletableFuture<CompletableFuture<String>> wrongChain = 
getUserById("123").thenApply(user -> getUserProfile(user));
// CORRECT: Use thenCompose to flatten the result
CompletableFuture<String> correctChain = 
getUserById("123")
.thenCompose(user -> getUserProfile(user))
.thenCompose(profile -> getProfileDetails(profile));
System.out.println("Correct chain result: " + correctChain.get());
// More concise with method references
CompletableFuture<String> conciseChain = 
getUserById("123")
.thenCompose(ThenComposeExample::getUserProfile)
.thenCompose(ThenComposeExample::getProfileDetails);
System.out.println("Concise chain result: " + conciseChain.get());
}
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

2. Combining Multiple Futures

Combining Independent Futures

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class CombiningFutures {
public static CompletableFuture<String> fetchUserData(String userId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Fetching user data for: " + userId);
sleep(300);
return "UserData-" + userId;
});
}
public static CompletableFuture<String> fetchOrderData(String userId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Fetching order data for: " + userId);
sleep(400);
return "OrderData-" + userId;
});
}
public static CompletableFuture<String> fetchPaymentData(String userId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Fetching payment data for: " + userId);
sleep(500);
return "PaymentData-" + userId;
});
}
public static void main(String[] args) throws Exception {
String userId = "user123";
// thenCombine - combine two independent futures
CompletableFuture<String> userAndOrder = 
fetchUserData(userId)
.thenCombine(fetchOrderData(userId), (userData, orderData) -> {
System.out.println("Combining user and order data");
return userData + " + " + orderData;
});
System.out.println("User + Order: " + userAndOrder.get());
// thenCombine with three futures
CompletableFuture<String> allData = 
fetchUserData(userId)
.thenCombine(fetchOrderData(userId), (user, order) -> user + " | " + order)
.thenCombine(fetchPaymentData(userId), (userOrder, payment) -> userOrder + " | " + payment);
System.out.println("All data: " + allData.get());
// Using allOf to wait for multiple futures
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
fetchUserData(userId),
fetchOrderData(userId),
fetchPaymentData(userId)
);
CompletableFuture<String> combinedResult = allFutures.thenApply(v -> {
// At this point, all futures are complete
try {
String user = fetchUserData(userId).get();
String order = fetchOrderData(userId).get();
String payment = fetchPaymentData(userId).get();
return user + " :: " + order + " :: " + payment;
} catch (Exception e) {
throw new RuntimeException(e);
}
});
System.out.println("AllOf result: " + combinedResult.get());
}
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

Complex Combination Patterns

import java.util.concurrent.CompletableFuture;
import java.util.*;
import java.util.stream.Collectors;
public class ComplexCombination {
public static CompletableFuture<Integer> processNumber(int number) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Processing number: " + number);
sleep(100);
return number * 2;
});
}
public static CompletableFuture<String> convertToString(int number) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Converting to string: " + number);
sleep(50);
return "Number-" + number;
});
}
public static CompletableFuture<Boolean> validateResult(String result) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Validating: " + result);
sleep(80);
return result.length() > 5;
});
}
public static void main(String[] args) throws Exception {
// Complex chain with multiple combinations
CompletableFuture<String> complexResult = 
processNumber(10)
.thenCompose(number -> 
convertToString(number)
.thenCombine(validateResult("Number-" + number), 
(str, valid) -> valid ? str : "INVALID: " + str)
)
.thenApply(result -> {
System.out.println("Final processing: " + result);
return "FINAL: " + result;
});
System.out.println("Complex result: " + complexResult.get());
// Processing a list of items with CompletableFuture
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
// Process all numbers in parallel and combine results
List<CompletableFuture<String>> futures = numbers.stream()
.map(number -> 
processNumber(number)
.thenCompose(ComplexCombination::convertToString)
.thenApply(str -> "Processed: " + str)
)
.collect(Collectors.toList());
// Wait for all to complete and collect results
CompletableFuture<Void> allDone = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
CompletableFuture<List<String>> allResults = allDone.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
System.out.println("All results: " + allResults.get());
// Using thenCombine with multiple dependencies
CompletableFuture<Integer> future1 = processNumber(5);
CompletableFuture<Integer> future2 = processNumber(10);
CompletableFuture<Integer> future3 = processNumber(15);
CompletableFuture<Integer> sumResult = future1
.thenCombine(future2, (a, b) -> a + b)
.thenCombine(future3, (ab, c) -> ab + c);
System.out.println("Sum result: " + sumResult.get());
}
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

3. Error Handling in Chains

Exception Handling Patterns

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ErrorHandlingChains {
public static CompletableFuture<String> fetchData(boolean shouldFail) {
return CompletableFuture.supplyAsync(() -> {
if (shouldFail) {
throw new RuntimeException("Data fetch failed!");
}
return "Success Data";
});
}
public static CompletableFuture<String> processData(String data) {
return CompletableFuture.supplyAsync(() -> {
if (data.contains("Invalid")) {
throw new IllegalArgumentException("Invalid data format");
}
return "Processed: " + data;
});
}
public static void main(String[] args) {
System.out.println("=== Example 1: Handle with exceptionally ===");
CompletableFuture<String> withExceptionHandler = 
fetchData(true)
.thenApply(data -> {
System.out.println("This won't execute due to exception");
return data.toUpperCase();
})
.exceptionally(throwable -> {
System.out.println("Exception caught: " + throwable.getMessage());
return "Fallback Data";
})
.thenApply(data -> {
System.out.println("Processing fallback: " + data);
return "Final: " + data;
});
System.out.println("Result: " + withExceptionHandler.join());
System.out.println("\n=== Example 2: Handle with handle() ===");
CompletableFuture<String> withHandle = 
fetchData(false)
.thenApply(data -> {
if (data.length() > 10) {
throw new RuntimeException("Data too long");
}
return data;
})
.handle((result, throwable) -> {
if (throwable != null) {
System.out.println("Handled exception: " + throwable.getMessage());
return "Recovered from error";
}
return "Success: " + result;
});
System.out.println("Handle result: " + withHandle.join());
System.out.println("\n=== Example 3: WhenComplete for side effects ===");
CompletableFuture<String> withWhenComplete = 
fetchData(true)
.whenComplete((result, throwable) -> {
if (throwable != null) {
System.out.println("Logging error: " + throwable.getMessage());
// Can't recover here, just side effects
} else {
System.out.println("Operation successful: " + result);
}
})
.exceptionally(throwable -> "Exceptionally handled");
System.out.println("WhenComplete result: " + withWhenComplete.join());
System.out.println("\n=== Example 4: Complex error recovery chain ===");
CompletableFuture<String> recoveryChain = 
fetchData(true)  // This will fail
.thenApply(String::toUpperCase)
.exceptionally(throwable -> {
System.out.println("First recovery: " + throwable.getMessage());
return "Recovery Data 1";
})
.thenCompose(data -> processData(data))  // This might also fail
.exceptionally(throwable -> {
System.out.println("Second recovery: " + throwable.getMessage());
return "Recovery Data 2";
})
.thenApply(finalData -> "Final: " + finalData);
System.out.println("Recovery chain result: " + recoveryChain.join());
}
}

Advanced Error Recovery

import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
public class AdvancedErrorRecovery {
// Simulate different data sources
public static CompletableFuture<String> fetchFromPrimarySource(String key) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Fetching from primary source: " + key);
sleep(100);
if (Math.random() > 0.7) {
throw new RuntimeException("Primary source unavailable");
}
return "Data from primary: " + key;
});
}
public static CompletableFuture<String> fetchFromSecondarySource(String key) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Fetching from secondary source: " + key);
sleep(150);
if (Math.random() > 0.9) {
throw new RuntimeException("Secondary source unavailable");
}
return "Data from secondary: " + key;
});
}
public static CompletableFuture<String> fetchFromCache(String key) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Fetching from cache: " + key);
sleep(50);
return "Cached data: " + key;
});
}
public static CompletableFuture<String> processWithRetry(String key, int maxRetries) {
return fetchFromPrimarySource(key)
.exceptionally(throwable -> {
System.out.println("Primary failed, trying secondary");
return null; // Trigger fallback
})
.thenCompose(result -> {
if (result != null) {
return CompletableFuture.completedFuture(result);
}
return fetchFromSecondarySource(key);
})
.exceptionally(throwable -> {
System.out.println("Secondary failed, using cache");
return null; // Trigger cache fallback
})
.thenCompose(result -> {
if (result != null) {
return CompletableFuture.completedFuture(result);
}
return fetchFromCache(key);
})
.exceptionally(throwable -> {
System.out.println("All sources failed, returning default");
return "Default data for: " + key;
});
}
// Generic retry mechanism
public static <T> CompletableFuture<T> retryAsync(
Function<Void, CompletableFuture<T>> task, 
int maxRetries) {
return task.apply(null)
.exceptionally(throwable -> {
if (maxRetries > 0) {
System.out.println("Retry attempt " + maxRetries);
return retryAsync(task, maxRetries - 1).join();
}
throw new RuntimeException("Max retries exceeded", throwable);
});
}
public static void main(String[] args) throws Exception {
System.out.println("=== Advanced Error Recovery ===");
// Test the recovery chain
for (int i = 0; i < 5; i++) {
CompletableFuture<String> result = processWithRetry("key-" + i, 3);
System.out.println("Result " + i + ": " + result.get());
System.out.println("---");
}
System.out.println("\n=== Generic Retry Mechanism ===");
CompletableFuture<String> withRetry = retryAsync(v -> 
CompletableFuture.supplyAsync(() -> {
double rand = Math.random();
System.out.println("Attempt with random: " + rand);
if (rand > 0.3) {
throw new RuntimeException("Simulated failure");
}
return "Success after retries";
}), 3);
System.out.println("Retry result: " + withRetry.get());
}
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

4. Real-World Use Cases

E-Commerce Order Processing

import java.util.concurrent.CompletableFuture;
import java.util.*;
public class ECommerceOrderProcessing {
static class OrderService {
public CompletableFuture<String> validateOrder(String orderId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Validating order: " + orderId);
sleep(100);
if (orderId.startsWith("INV")) {
return orderId;
}
throw new IllegalArgumentException("Invalid order ID: " + orderId);
});
}
public CompletableFuture<Double> calculateTotal(String orderId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Calculating total for: " + orderId);
sleep(150);
return 99.99 + (orderId.hashCode() % 100); // Simulate calculation
});
}
public CompletableFuture<Boolean> checkInventory(String orderId, double amount) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Checking inventory for: " + orderId);
sleep(200);
return amount < 150.0; // Simulate inventory check
});
}
public CompletableFuture<String> processPayment(String orderId, double amount) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Processing payment for: " + orderId + ", amount: " + amount);
sleep(300);
if (amount > 200.0) {
throw new RuntimeException("Payment declined: Amount too high");
}
return "Payment processed for " + orderId;
});
}
public CompletableFuture<String> updateInventory(String orderId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Updating inventory for: " + orderId);
sleep(100);
return "Inventory updated for " + orderId;
});
}
public CompletableFuture<String> sendConfirmation(String orderId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Sending confirmation for: " + orderId);
sleep(50);
return "Confirmation sent for " + orderId;
});
}
public CompletableFuture<String> processOrder(String orderId) {
return validateOrder(orderId)
.thenCompose(validatedOrderId -> 
calculateTotal(validatedOrderId)
.thenCompose(amount -> 
checkInventory(validatedOrderId, amount)
.thenCompose(inStock -> {
if (!inStock) {
throw new RuntimeException("Out of stock for: " + validatedOrderId);
}
return processPayment(validatedOrderId, amount);
})
)
)
.thenCompose(paymentResult -> 
updateInventory(orderId)
.thenCombine(sendConfirmation(orderId), 
(inventory, confirmation) -> 
paymentResult + " | " + inventory + " | " + confirmation)
)
.exceptionally(throwable -> {
System.out.println("Order processing failed: " + throwable.getMessage());
return "Failed to process order: " + throwable.getMessage();
});
}
}
public static void main(String[] args) throws Exception {
OrderService service = new OrderService();
List<String> orderIds = Arrays.asList("INV001", "INV002", "BAD003", "INV004");
// Process orders concurrently
List<CompletableFuture<String>> orderFutures = orderIds.stream()
.map(service::processOrder)
.collect(Collectors.toList());
// Wait for all orders to complete
CompletableFuture<Void> allOrders = CompletableFuture.allOf(
orderFutures.toArray(new CompletableFuture[0])
);
CompletableFuture<List<String>> allResults = allOrders.thenApply(v ->
orderFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
System.out.println("\n=== Order Processing Results ===");
List<String> results = allResults.get();
results.forEach(System.out::println);
// Statistics
long successCount = results.stream()
.filter(result -> result.startsWith("Payment processed"))
.count();
System.out.printf("\nSuccess rate: %d/%d (%.1f%%)%n", 
successCount, orderIds.size(), 
(successCount * 100.0 / orderIds.size()));
}
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

API Gateway Pattern

import java.util.concurrent.CompletableFuture;
import java.util.*;
import java.util.stream.Collectors;
public class ApiGatewayPattern {
static class ApiGateway {
public CompletableFuture<UserProfile> getUserProfile(String userId) {
CompletableFuture<UserInfo> userInfo = getUserInfo(userId);
CompletableFuture<List<Order>> userOrders = getUserOrders(userId);
CompletableFuture<List<Payment>> userPayments = getUserPayments(userId);
CompletableFuture<UserPreferences> userPrefs = getUserPreferences(userId);
return CompletableFuture.allOf(userInfo, userOrders, userPayments, userPrefs)
.thenApply(v -> {
try {
return new UserProfile(
userInfo.get(),
userOrders.get(),
userPayments.get(),
userPrefs.get()
);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
public CompletableFuture<DashboardData> getDashboardData(String userId) {
return getUserProfile(userId)
.thenCompose(profile -> 
getRecommendations(userId)
.thenCombine(getNotifications(userId), 
(recommendations, notifications) -> 
new DashboardData(profile, recommendations, notifications))
)
.exceptionally(throwable -> {
System.out.println("Dashboard data incomplete: " + throwable.getMessage());
return DashboardData.createPartial(throwable.getMessage());
});
}
// Simulated microservices
private CompletableFuture<UserInfo> getUserInfo(String userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(100);
System.out.println("Fetched user info for: " + userId);
return new UserInfo(userId, "User " + userId, "user" + userId + "@example.com");
});
}
private CompletableFuture<List<Order>> getUserOrders(String userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(150);
System.out.println("Fetched orders for: " + userId);
return Arrays.asList(
new Order("ORD001", 99.99),
new Order("ORD002", 149.99)
);
});
}
private CompletableFuture<List<Payment>> getUserPayments(String userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(120);
System.out.println("Fetched payments for: " + userId);
return Arrays.asList(
new Payment("PAY001", 99.99, "COMPLETED"),
new Payment("PAY002", 149.99, "PENDING")
);
});
}
private CompletableFuture<UserPreferences> getUserPreferences(String userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(80);
System.out.println("Fetched preferences for: " + userId);
return new UserPreferences(true, "EN", "USD");
});
}
private CompletableFuture<List<String>> getRecommendations(String userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(200);
System.out.println("Fetched recommendations for: " + userId);
return Arrays.asList("Product A", "Product B", "Product C");
});
}
private CompletableFuture<List<String>> getNotifications(String userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(50);
System.out.println("Fetched notifications for: " + userId);
return Arrays.asList("Welcome bonus!", "New feature available");
});
}
}
// Data classes
static class UserProfile {
final UserInfo userInfo;
final List<Order> orders;
final List<Payment> payments;
final UserPreferences preferences;
UserProfile(UserInfo userInfo, List<Order> orders, List<Payment> payments, UserPreferences preferences) {
this.userInfo = userInfo;
this.orders = orders;
this.payments = payments;
this.preferences = preferences;
}
@Override
public String toString() {
return String.format("UserProfile{user=%s, orders=%d, payments=%d, prefs=%s}",
userInfo.name, orders.size(), payments.size(), preferences.language);
}
}
static class DashboardData {
final UserProfile profile;
final List<String> recommendations;
final List<String> notifications;
final String error;
DashboardData(UserProfile profile, List<String> recommendations, List<String> notifications) {
this.profile = profile;
this.recommendations = recommendations;
this.notifications = notifications;
this.error = null;
}
static DashboardData createPartial(String error) {
DashboardData data = new DashboardData(null, List.of(), List.of());
data.error = error;
return data;
}
@Override
public String toString() {
if (error != null) {
return "DashboardData{error='" + error + "'}";
}
return String.format("DashboardData{profile=%s, recommendations=%d, notifications=%d}",
profile, recommendations.size(), notifications.size());
}
}
static class UserInfo {
final String id;
final String name;
final String email;
UserInfo(String id, String name, String email) {
this.id = id;
this.name = name;
this.email = email;
}
}
static class Order {
final String id;
final double amount;
Order(String id, double amount) {
this.id = id;
this.amount = amount;
}
}
static class Payment {
final String id;
final double amount;
final String status;
Payment(String id, double amount, String status) {
this.id = id;
this.amount = amount;
this.status = status;
}
}
static class UserPreferences {
final boolean emailNotifications;
final String language;
final String currency;
UserPreferences(boolean emailNotifications, String language, String currency) {
this.emailNotifications = emailNotifications;
this.language = language;
this.currency = currency;
}
}
public static void main(String[] args) throws Exception {
ApiGateway gateway = new ApiGateway();
System.out.println("=== Testing API Gateway Pattern ===\n");
// Test single user profile
CompletableFuture<UserProfile> profileFuture = gateway.getUserProfile("user123");
System.out.println("User Profile: " + profileFuture.get());
System.out.println("\n=== Testing Dashboard Data ===\n");
// Test dashboard data
CompletableFuture<DashboardData> dashboardFuture = gateway.getDashboardData("user456");
System.out.println("Dashboard Data: " + dashboardFuture.get());
System.out.println("\n=== Testing Multiple Users Concurrently ===\n");
// Process multiple users concurrently
List<String> userIds = Arrays.asList("user1", "user2", "user3", "user4");
List<CompletableFuture<DashboardData>> userDashboards = userIds.stream()
.map(gateway::getDashboardData)
.collect(Collectors.toList());
CompletableFuture<Void> allDashboards = CompletableFuture.allOf(
userDashboards.toArray(new CompletableFuture[0])
);
CompletableFuture<List<DashboardData>> allResults = allDashboards.thenApply(v ->
userDashboards.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
List<DashboardData> results = allResults.get();
results.forEach(System.out::println);
}
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

5. Performance Optimization

Optimized Chaining Patterns

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.*;
public class OptimizedChaining {
// Custom executor for CPU-intensive tasks
private static final Executor cpuBoundExecutor = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
// Custom executor for I/O-intensive tasks  
private static final Executor ioBoundExecutor =
Executors.newFixedThreadPool(20);
public static CompletableFuture<String> fetchData(String source) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Fetching from " + source + " on " + Thread.currentThread().getName());
sleep(200); // Simulate I/O
return "Data from " + source;
}, ioBoundExecutor);
}
public static CompletableFuture<String> processData(String data) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Processing " + data + " on " + Thread.currentThread().getName());
// Simulate CPU-intensive work
long result = 0;
for (int i = 0; i < 1000000; i++) {
result += i * i;
}
return data.toUpperCase() + "-PROCESSED";
}, cpuBoundExecutor);
}
public static CompletableFuture<String> saveData(String data) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Saving " + data + " on " + Thread.currentThread().getName());
sleep(150); // Simulate I/O
return data + "-SAVED";
}, ioBoundExecutor);
}
public static CompletableFuture<String> optimizedPipeline(String source) {
return fetchData(source)
.thenApplyAsync(data -> {
System.out.println("Quick transformation on " + Thread.currentThread().getName());
return data + "-transformed";
}) // Uses same executor as fetchData
.thenComposeAsync(OptimizedChaining::processData, cpuBoundExecutor) // Switch to CPU executor
.thenComposeAsync(OptimizedChaining::saveData, ioBoundExecutor); // Switch back to I/O executor
}
// Batch processing optimization
public static CompletableFuture<List<String>> processBatch(List<String> items) {
// Process items in parallel but limit concurrency
List<CompletableFuture<String>> futures = items.stream()
.map(item -> processData(item).exceptionally(e -> "Failed: " + e.getMessage()))
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
// Memory optimization: process stream without collecting all results
public static CompletableFuture<Void> processStream(List<String> items) {
CompletableFuture<Void> lastFuture = CompletableFuture.completedFuture(null);
for (String item : items) {
lastFuture = lastFuture.thenCompose(v -> 
processData(item)
.thenAccept(result -> 
System.out.println("Processed: " + result)
)
);
}
return lastFuture;
}
public static void main(String[] args) throws Exception {
System.out.println("=== Optimized Chaining Examples ===\n");
// Test optimized pipeline
CompletableFuture<String> pipelineResult = optimizedPipeline("database");
System.out.println("Pipeline result: " + pipelineResult.get());
System.out.println("\n=== Batch Processing ===\n");
// Test batch processing
List<String> batchItems = Arrays.asList("item1", "item2", "item3", "item4", "item5");
CompletableFuture<List<String>> batchResult = processBatch(batchItems);
System.out.println("Batch results: " + batchResult.get());
System.out.println("\n=== Stream Processing ===\n");
// Test stream processing
CompletableFuture<Void> streamResult = processStream(batchItems);
streamResult.get();
System.out.println("Stream processing completed");
}
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

Summary

Key Chaining Methods:

MethodPurposeUse Case
thenApply()Transform result synchronouslySimple data transformation
thenApplyAsync()Transform result asynchronouslyOffload to different thread
thenCompose()Chain dependent futuresSequential async operations
thenCombine()Combine two independent futuresParallel execution
thenAccept()Consume result without returnSide effects
exceptionally()Handle exceptionsError recovery
handle()Process result or exceptionUnified success/error handling

Best Practices:

  1. Use thenCompose for dependent async operations
  2. Use thenCombine for independent async operations
  3. Choose appropriate executors for different workload types
  4. Handle exceptions at appropriate levels in the chain
  5. Use thenApplyAsync to prevent blocking caller thread
  6. Consider memory usage for large numbers of futures

Performance Tips:

  • Use custom executors for different types of work (I/O vs CPU)
  • Limit parallelism for resource-intensive operations
  • Use batching for large datasets
  • Avoid deep nesting - flatten chains when possible
  • Monitor thread pool usage in production

CompletableFuture chaining provides a powerful and expressive way to build complex asynchronous workflows while maintaining readability and proper error handling.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper