CompletableFuture in Java

CompletableFuture is a powerful class introduced in Java 8 that represents a future result of an asynchronous computation. It provides a rich API for composing, combining, and executing asynchronous tasks.

1. Basic CompletableFuture Operations

Creating CompletableFuture

import java.util.concurrent.*;
import java.util.function.*;
public class CompletableFutureBasics {
public static void main(String[] args) throws Exception {
// 1. Creating completed CompletableFuture
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("Hello World");
System.out.println("Completed future: " + completedFuture.get());
// 2. Running async task without return value
CompletableFuture<Void> runAsyncFuture = CompletableFuture.runAsync(() -> {
System.out.println("Running in: " + Thread.currentThread().getName());
sleep(1000);
System.out.println("Run async completed");
});
// 3. Running async task with return value
CompletableFuture<String> supplyAsyncFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Supplying in: " + Thread.currentThread().getName());
sleep(1000);
return "Supply Async Result";
});
System.out.println("Supply async result: " + supplyAsyncFuture.get());
// 4. Using custom executor
ExecutorService customExecutor = Executors.newFixedThreadPool(3);
CompletableFuture<String> customFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Using custom executor: " + Thread.currentThread().getName());
return "Custom executor result";
}, customExecutor);
System.out.println("Custom executor result: " + customFuture.get());
customExecutor.shutdown();
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

Basic Transformation and Chaining

public class TransformationExamples {
public static void main(String[] args) throws Exception {
// 1. thenApply - transform result
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenApply(String::toUpperCase);
System.out.println("thenApply result: " + future.get());
// 2. thenAccept - consume result without returning
CompletableFuture<Void> acceptFuture = CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept(s -> System.out.println("Consuming: " + s));
acceptFuture.get();
// 3. thenRun - run after completion, no access to result
CompletableFuture<Void> runFuture = CompletableFuture.supplyAsync(() -> "Hello")
.thenRun(() -> System.out.println("Task completed"));
runFuture.get();
// 4. thenCompose - chain dependent futures (flatMap equivalent)
CompletableFuture<String> composeFuture = getUser(1L)
.thenCompose(TransformationExamples::getUserProfile);
System.out.println("Composed result: " + composeFuture.get());
// 5. Async variants - execute on different threads
CompletableFuture<String> asyncFuture = CompletableFuture.supplyAsync(() -> "Hello")
.thenApplyAsync(s -> {
System.out.println("Async transformation in: " + Thread.currentThread().getName());
return s + " Async World";
});
System.out.println("Async result: " + asyncFuture.get());
}
private static CompletableFuture<User> getUser(Long userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(500);
return new User(userId, "user" + userId);
});
}
private static CompletableFuture<String> getUserProfile(User user) {
return CompletableFuture.supplyAsync(() -> {
sleep(500);
return "Profile for " + user.getName();
});
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class User {
private Long id;
private String name;
public User(Long id, String name) {
this.id = id;
this.name = name;
}
public Long getId() { return id; }
public String getName() { return name; }
}

2. Combining Multiple Futures

Combining Independent Futures

public class CombiningFutures {
public static void main(String[] args) throws Exception {
// 1. thenCombine - combine two independent futures
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "Hello";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(500);
return "World";
});
CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
System.out.println("Combined: " + combined.get());
// 2. thenAcceptBoth - consume both results
CompletableFuture<Void> acceptBoth = future1.thenAcceptBoth(future2, 
(s1, s2) -> System.out.println("Both: " + s1 + " and " + s2));
acceptBoth.get();
// 3. runAfterBoth - run after both complete
CompletableFuture<Void> runAfterBoth = future1.runAfterBoth(future2,
() -> System.out.println("Both futures completed"));
runAfterBoth.get();
// 4. allOf - wait for all futures to complete
CompletableFuture<String>[] futures = new CompletableFuture[] {
CompletableFuture.supplyAsync(() -> { sleep(1000); return "Task1"; }),
CompletableFuture.supplyAsync(() -> { sleep(1500); return "Task2"; }),
CompletableFuture.supplyAsync(() -> { sleep(500); return "Task3"; })
};
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures);
allOf.thenRun(() -> {
System.out.println("All tasks completed");
for (CompletableFuture<String> f : futures) {
System.out.println("Result: " + f.join()); // join() doesn't throw checked exception
}
}).get();
// 5. anyOf - wait for any future to complete
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futures);
System.out.println("First completed: " + anyOf.get());
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

Complex Combination Examples

public class AdvancedCombination {
public static void main(String[] args) throws Exception {
// Combine multiple API calls
CompletableFuture<User> userFuture = getUser(1L);
CompletableFuture<List<Order>> ordersFuture = getOrders(1L);
CompletableFuture<List<Address>> addressesFuture = getAddresses(1L);
// Combine all results
CompletableFuture<UserProfile> profileFuture = userFuture
.thenCombine(ordersFuture, (user, orders) -> new UserProfile(user, orders, null))
.thenCombine(addressesFuture, (profile, addresses) -> {
profile.setAddresses(addresses);
return profile;
});
UserProfile profile = profileFuture.get();
System.out.println("User profile: " + profile);
// Process results as they complete
processResultsAsTheyComplete();
}
private static void processResultsAsTheyComplete() throws Exception {
CompletableFuture<String> fastTask = CompletableFuture.supplyAsync(() -> {
sleep(500);
return "Fast task completed";
});
CompletableFuture<String> mediumTask = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "Medium task completed";
});
CompletableFuture<String> slowTask = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "Slow task completed";
});
// Apply to either - process whichever completes first
fastTask.acceptEither(mediumTask, result -> 
System.out.println("First completed: " + result));
// Chain with the faster one
CompletableFuture<String> result = fastTask.applyToEither(slowTask, 
firstResult -> "Processing: " + firstResult);
System.out.println("Apply to either result: " + result.get());
}
private static CompletableFuture<User> getUser(Long userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(1000);
return new User(userId, "John Doe");
});
}
private static CompletableFuture<List<Order>> getOrders(Long userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(1500);
return List.of(new Order(1L, "Order1"), new Order(2L, "Order2"));
});
}
private static CompletableFuture<List<Address>> getAddresses(Long userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(1200);
return List.of(new Address("123 Main St"), new Address("456 Oak Ave"));
});
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class UserProfile {
private User user;
private List<Order> orders;
private List<Address> addresses;
public UserProfile(User user, List<Order> orders, List<Address> addresses) {
this.user = user;
this.orders = orders;
this.addresses = addresses;
}
// getters and setters
public void setAddresses(List<Address> addresses) { this.addresses = addresses; }
@Override
public String toString() {
return String.format("UserProfile{user=%s, orders=%d, addresses=%d}", 
user.getName(), orders.size(), addresses.size());
}
}
class Order {
private Long id;
private String name;
public Order(Long id, String name) {
this.id = id;
this.name = name;
}
public String getName() { return name; }
}
class Address {
private String street;
public Address(String street) {
this.street = street;
}
public String getStreet() { return street; }
}

3. Error Handling and Recovery

public class ErrorHandlingExamples {
public static void main(String[] args) throws Exception {
// 1. exceptionally - handle exceptions
CompletableFuture<String> futureWithException = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("Something went wrong!");
return "Success";
});
CompletableFuture<String> recovered = futureWithException.exceptionally(ex -> {
System.out.println("Exception caught: " + ex.getMessage());
return "Default Value";
});
System.out.println("Recovered result: " + recovered.get());
// 2. handle - handle both success and failure
CompletableFuture<String> handled = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Random failure");
}
return "Success";
}).handle((result, exception) -> {
if (exception != null) {
return "Recovered from: " + exception.getMessage();
}
return result;
});
System.out.println("Handled result: " + handled.get());
// 3. whenComplete - always execute, similar to finally
CompletableFuture<String> withCompletion = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.7) {
throw new RuntimeException("Operation failed");
}
return "Operation succeeded";
}).whenComplete((result, exception) -> {
if (exception != null) {
System.out.println("Task failed with: " + exception.getMessage());
} else {
System.out.println("Task succeeded with: " + result);
}
});
try {
System.out.println("Final result: " + withCompletion.get());
} catch (Exception e) {
System.out.println("Exception in get: " + e.getCause().getMessage());
}
// 4. Complex recovery chain
complexRecoveryExample();
}
private static void complexRecoveryExample() throws Exception {
CompletableFuture<String> complexFuture = fetchDataFromPrimary()
.exceptionally(ex -> {
System.out.println("Primary failed, trying secondary: " + ex.getMessage());
return fetchDataFromSecondary().join(); // join since we're in exceptionally
})
.handle((result, ex) -> {
if (ex != null) {
System.out.println("Both primary and secondary failed, using cache");
return getCachedData();
}
return result;
})
.thenApply(data -> "Processed: " + data);
System.out.println("Complex recovery result: " + complexFuture.get());
}
private static CompletableFuture<String> fetchDataFromPrimary() {
return CompletableFuture.supplyAsync(() -> {
sleep(500);
if (Math.random() > 0.3) {
throw new RuntimeException("Primary service unavailable");
}
return "Data from primary";
});
}
private static CompletableFuture<String> fetchDataFromSecondary() {
return CompletableFuture.supplyAsync(() -> {
sleep(300);
if (Math.random() > 0.2) {
throw new RuntimeException("Secondary service unavailable");
}
return "Data from secondary";
});
}
private static String getCachedData() {
return "Cached data";
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

4. Timeout and Control Operations

public class TimeoutControlExamples {
public static void main(String[] args) throws Exception {
// 1. Complete manually
CompletableFuture<String> manualFuture = new CompletableFuture<>();
new Thread(() -> {
sleep(2000);
manualFuture.complete("Manually completed");
}).start();
// Complete exceptionally
CompletableFuture<String> exceptionalFuture = new CompletableFuture<>();
new Thread(() -> {
sleep(1000);
exceptionalFuture.completeExceptionally(new RuntimeException("Forced failure"));
}).start();
// 2. Timeout with completeOnTimeout (Java 9+)
CompletableFuture<String> timeoutFuture = CompletableFuture.supplyAsync(() -> {
sleep(3000);
return "Slow result";
}).completeOnTimeout("Timeout default", 1, TimeUnit.SECONDS);
System.out.println("Timeout result: " + timeoutFuture.get());
// 3. Timeout with orTimeout (Java 9+)
CompletableFuture<String> orTimeoutFuture = CompletableFuture.supplyAsync(() -> {
sleep(3000);
return "Slow result";
}).orTimeout(1, TimeUnit.SECONDS);
try {
System.out.println("OrTimeout result: " + orTimeoutFuture.get());
} catch (ExecutionException e) {
System.out.println("Timeout occurred: " + e.getCause().getMessage());
}
// 4. Cancel operation
CompletableFuture<String> cancelableFuture = new CompletableFuture<>();
new Thread(() -> {
sleep(1000);
cancelableFuture.cancel(true);
}).start();
try {
cancelableFuture.get();
} catch (CancellationException e) {
System.out.println("Future was cancelled");
}
// 5. Check completion status
CompletableFuture<String> statusFuture = CompletableFuture.supplyAsync(() -> {
sleep(500);
return "Done";
});
System.out.println("Is done: " + statusFuture.isDone());
System.out.println("Is cancelled: " + statusFuture.isCancelled());
System.out.println("Is completed exceptionally: " + statusFuture.isCompletedExceptionally());
statusFuture.get();
System.out.println("Is done after get: " + statusFuture.isDone());
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

5. Real-World Use Cases

Parallel API Calls

public class ParallelApiCalls {
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
// Make multiple API calls in parallel
CompletableFuture<UserInfo> userInfoFuture = getUserInfo(1L);
CompletableFuture<List<Product>> productsFuture = getRecommendedProducts(1L);
CompletableFuture<List<Review>> reviewsFuture = getUserReviews(1L);
// Combine all results
CompletableFuture<UserDashboard> dashboardFuture = userInfoFuture
.thenCombine(productsFuture, (userInfo, products) -> 
new UserDashboard(userInfo, products, null))
.thenCombine(reviewsFuture, (dashboard, reviews) -> {
dashboard.setReviews(reviews);
return dashboard;
});
UserDashboard dashboard = dashboardFuture.get();
long endTime = System.currentTimeMillis();
System.out.println("Dashboard: " + dashboard);
System.out.println("Total time: " + (endTime - startTime) + "ms");
// Compare with sequential execution
sequentialExecution();
}
private static void sequentialExecution() throws Exception {
long startTime = System.currentTimeMillis();
UserInfo userInfo = getUserInfo(1L).get();
List<Product> products = getRecommendedProducts(1L).get();
List<Review> reviews = getUserReviews(1L).get();
UserDashboard dashboard = new UserDashboard(userInfo, products, reviews);
long endTime = System.currentTimeMillis();
System.out.println("Sequential time: " + (endTime - startTime) + "ms");
}
private static CompletableFuture<UserInfo> getUserInfo(Long userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(1000); // Simulate API call
return new UserInfo(userId, "John Doe", "[email protected]");
});
}
private static CompletableFuture<List<Product>> getRecommendedProducts(Long userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(1200); // Simulate API call
return List.of(
new Product(1L, "Laptop"),
new Product(2L, "Mouse"),
new Product(3L, "Keyboard")
);
});
}
private static CompletableFuture<List<Review>> getUserReviews(Long userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(800); // Simulate API call
return List.of(
new Review(1L, "Great product!"),
new Review(2L, "Fast delivery")
);
});
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class UserInfo {
private Long id;
private String name;
private String email;
public UserInfo(Long id, String name, String email) {
this.id = id;
this.name = name;
this.email = email;
}
public String getName() { return name; }
}
class Product {
private Long id;
private String name;
public Product(Long id, String name) {
this.id = id;
this.name = name;
}
public String getName() { return name; }
}
class Review {
private Long id;
private String comment;
public Review(Long id, String comment) {
this.id = id;
this.comment = comment;
}
public String getComment() { return comment; }
}
class UserDashboard {
private UserInfo userInfo;
private List<Product> products;
private List<Review> reviews;
public UserDashboard(UserInfo userInfo, List<Product> products, List<Review> reviews) {
this.userInfo = userInfo;
this.products = products;
this.reviews = reviews;
}
public void setReviews(List<Review> reviews) { this.reviews = reviews; }
@Override
public String toString() {
return String.format("UserDashboard{user=%s, products=%d, reviews=%d}", 
userInfo.getName(), products.size(), reviews.size());
}
}

Batch Processing

public class BatchProcessing {
public static void main(String[] args) throws Exception {
// Process items in batches asynchronously
List<Long> itemIds = List.of(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
// Process all items in parallel
List<CompletableFuture<ProcessedItem>> futures = itemIds.stream()
.map(BatchProcessing::processItemAsync)
.toList();
// Wait for all to complete
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]));
// Collect results
CompletableFuture<List<ProcessedItem>> resultsFuture = allFutures.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.toList());
List<ProcessedItem> results = resultsFuture.get();
System.out.println("Processed " + results.size() + " items");
// Process with limited parallelism
processWithLimitedParallelism(itemIds);
}
private static void processWithLimitedParallelism(List<Long> itemIds) throws Exception {
ExecutorService limitedExecutor = Executors.newFixedThreadPool(3);
List<CompletableFuture<ProcessedItem>> futures = itemIds.stream()
.map(id -> CompletableFuture.supplyAsync(() -> processItem(id), limitedExecutor))
.toList();
CompletableFuture<List<ProcessedItem>> allResults = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList());
List<ProcessedItem> results = allResults.get();
System.out.println("Limited parallelism processed: " + results.size() + " items");
limitedExecutor.shutdown();
}
private static CompletableFuture<ProcessedItem> processItemAsync(Long itemId) {
return CompletableFuture.supplyAsync(() -> processItem(itemId));
}
private static ProcessedItem processItem(Long itemId) {
sleep(500); // Simulate processing time
System.out.println("Processing item " + itemId + " in " + Thread.currentThread().getName());
return new ProcessedItem(itemId, "Processed_" + itemId);
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class ProcessedItem {
private Long id;
private String data;
public ProcessedItem(Long id, String data) {
this.id = id;
this.data = data;
}
public Long getId() { return id; }
public String getData() { return data; }
}

6. Best Practices and Common Patterns

public class BestPractices {
public static void main(String[] args) {
// 1. Always handle exceptions
safeAsyncOperation();
// 2. Use custom executors for resource control
useCustomExecutor();
// 3. Avoid blocking in async operations
nonBlockingComposition();
// 4. Proper resource cleanup
resourceCleanupExample();
}
private static void safeAsyncOperation() {
CompletableFuture<String> safeFuture = CompletableFuture.supplyAsync(() -> {
// Some operation that might fail
return riskyOperation();
}).exceptionally(ex -> {
// Always handle exceptions
System.err.println("Operation failed: " + ex.getMessage());
return "default-value";
});
// Don't forget to handle the result
safeFuture.thenAccept(result -> {
System.out.println("Safe result: " + result);
});
}
private static void useCustomExecutor() {
ExecutorService ioBoundExecutor = Executors.newFixedThreadPool(10);
ExecutorService cpuBoundExecutor = Executors.newWorkStealingPool();
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// I/O bound operation
return fetchFromDatabase();
}, ioBoundExecutor).thenApplyAsync(data -> {
// CPU intensive processing
return processData(data);
}, cpuBoundExecutor);
future.thenRunAsync(() -> {
System.out.println("Completed with custom executors");
}, ioBoundExecutor);
// Always shutdown custom executors
ioBoundExecutor.shutdown();
cpuBoundExecutor.shutdown();
}
private static void nonBlockingComposition() {
// Good: Non-blocking composition
CompletableFuture<String> goodFuture = CompletableFuture.supplyAsync(() -> "data")
.thenApplyAsync(data -> transform(data))
.thenComposeAsync(transformed -> furtherProcessing(transformed));
// Bad: Blocking in async chain
CompletableFuture<String> badFuture = CompletableFuture.supplyAsync(() -> "data")
.thenApply(data -> {
try {
// Blocking call in transformation!
return blockingOperation(data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
}
private static void resourceCleanupExample() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// Acquire resource
Resource resource = acquireResource();
try {
return useResource(resource);
} finally {
// Always release resource
releaseResource(resource);
}
});
// Handle completion
future.whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("Operation failed: " + ex.getMessage());
} else {
System.out.println("Operation succeeded: " + result);
}
});
}
// Mock methods for demonstration
private static String riskyOperation() { return "result"; }
private static String fetchFromDatabase() { return "data"; }
private static String processData(String data) { return "processed"; }
private static String transform(String data) { return "transformed"; }
private static CompletableFuture<String> furtherProcessing(String data) {
return CompletableFuture.completedFuture("processed");
}
private static String blockingOperation(String data) throws InterruptedException {
Thread.sleep(1000);
return "blocking-result";
}
private static Resource acquireResource() { return new Resource(); }
private static String useResource(Resource resource) { return "used"; }
private static void releaseResource(Resource resource) { /* cleanup */ }
}
class Resource {
// Mock resource class
}

Key Features Summary:

  1. Async Execution: Run tasks asynchronously with supplyAsync() and runAsync()
  2. Transformation: Transform results with thenApply(), thenCompose()
  3. Combination: Combine multiple futures with thenCombine(), allOf(), anyOf()
  4. Error Handling: Recover from exceptions with exceptionally(), handle()
  5. Control: Manually complete, cancel, or timeout futures
  6. Chaining: Create complex async workflows with method chaining
  7. Executor Control: Use custom executors for resource management

CompletableFuture provides a powerful and flexible way to write asynchronous, non-blocking code in Java, making it essential for modern high-performance applications.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper