Java 21 Virtual Threads (Project Loom)

Comprehensive guide to Virtual Threads introduced in Java 21 as part of Project Loom, revolutionizing concurrent programming.

1. Virtual Threads Overview

Traditional vs Virtual Threads

public class VirtualThreadsOverview {
public static void main(String[] args) {
System.out.println("=== Virtual Threads vs Platform Threads ===");
// Platform Thread (traditional)
Thread platformThread = new Thread(() -> {
System.out.println("Platform Thread: " + Thread.currentThread());
System.out.println("Is Virtual: " + Thread.currentThread().isVirtual());
});
// Virtual Thread (new in Java 21)
Thread virtualThread = Thread.ofVirtual().start(() -> {
System.out.println("Virtual Thread: " + Thread.currentThread());
System.out.println("Is Virtual: " + Thread.currentThread().isVirtual());
});
try {
platformThread.join();
virtualThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

Key Characteristics

public class VirtualThreadCharacteristics {
public static void demonstrateCharacteristics() {
System.out.println("=== Virtual Threads Characteristics ===");
// 1. Lightweight - can create millions
Runnable task = () -> {
System.out.println("Executing in: " + Thread.currentThread());
System.out.println("Is Virtual: " + Thread.currentThread().isVirtual());
System.out.println("Thread ID: " + Thread.currentThread().threadId());
System.out.println("Is Daemon: " + Thread.currentThread().isDaemon());
};
// Create multiple virtual threads
for (int i = 0; i < 5; i++) {
Thread virtualThread = Thread.ofVirtual().start(task);
}
// Virtual threads are always daemon threads
Thread virtualThread = Thread.ofVirtual().unstarted(task);
System.out.println("Before start - Is Daemon: " + virtualThread.isDaemon());
virtualThread.start();
}
}

2. Creating Virtual Threads

Different Creation Methods

public class VirtualThreadCreation {
public static void main(String[] args) throws Exception {
System.out.println("=== Virtual Thread Creation Methods ===");
// Method 1: Thread.ofVirtual().start()
Thread vThread1 = Thread.ofVirtual().start(() -> {
System.out.println("Method 1 - Direct start: " + Thread.currentThread().getName());
});
// Method 2: Thread.ofVirtual().unstarted() + start()
Thread vThread2 = Thread.ofVirtual()
.name("custom-virtual-thread-1")
.unstarted(() -> {
System.out.println("Method 2 - Unstarted: " + Thread.currentThread().getName());
});
vThread2.start();
// Method 3: Thread.startVirtualThread() (convenience method)
Thread vThread3 = Thread.startVirtualThread(() -> {
System.out.println("Method 3 - Start virtual: " + Thread.currentThread().getName());
});
// Method 4: Using Executors
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> {
System.out.println("Method 4 - Executor: " + Thread.currentThread().getName());
});
}
vThread1.join();
vThread2.join();
vThread3.join();
}
}

Builder Pattern for Configuration

public class VirtualThreadBuilder {
public static void demonstrateBuilder() {
System.out.println("=== Virtual Thread Builder ===");
// Configure virtual thread with builder
Thread.Builder builder = Thread.ofVirtual()
.name("worker-", 0)  // name pattern and start index
.inheritInheritableThreadLocals(false); // control inheritance
// Create multiple threads with sequential names
for (int i = 0; i < 3; i++) {
Thread virtualThread = builder.start(() -> {
System.out.println("Thread: " + Thread.currentThread().getName() + 
", Virtual: " + Thread.currentThread().isVirtual());
});
}
// Factory method for unstarted threads
Thread unstarted = Thread.ofVirtual()
.name("unstarted-worker")
.unstarted(() -> {
System.out.println("This won't print until started");
});
System.out.println("Created unstarted thread: " + unstarted.getName());
unstarted.start(); // Now it will execute
}
}

3. Executors with Virtual Threads

Virtual Thread Executors

public class VirtualThreadExecutors {
public static void main(String[] args) throws Exception {
System.out.println("=== Virtual Thread Executors ===");
// Method 1: newVirtualThreadPerTaskExecutor (recommended)
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int taskId = i;
Future<String> future = executor.submit(() -> {
Thread.sleep(100); // Simulate work
return "Task-" + taskId + " completed by " + 
Thread.currentThread().getName();
});
futures.add(future);
}
// Collect results
for (Future<String> future : futures) {
System.out.println(future.get());
}
}
// Method 2: newThreadPerTaskExecutor with virtual thread factory
ThreadFactory virtualThreadFactory = Thread.ofVirtual().factory();
try (var executor = Executors.newThreadPerTaskExecutor(virtualThreadFactory)) {
executor.submit(() -> {
System.out.println("Using custom factory: " + Thread.currentThread().getName());
});
}
}
}

ExecutorService Patterns

public class ExecutorServicePatterns {
public void processUsers(List<String> userIds) throws Exception {
System.out.println("=== Processing with Virtual Threads ===");
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// Submit all tasks
List<CompletableFuture<UserProfile>> futures = userIds.stream()
.map(userId -> CompletableFuture.supplyAsync(() -> 
fetchUserProfile(userId), executor))
.toList();
// Wait for all completions
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
// Get results
List<UserProfile> profiles = allFutures
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList())
.join();
System.out.println("Processed " + profiles.size() + " users");
}
}
public void batchProcessing(List<Order> orders) throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<OrderResult>> futures = orders.stream()
.map(order -> CompletableFuture.supplyAsync(() -> 
processOrder(order), executor))
.toList();
// Process as they complete
CompletableFuture<?>[] futureArray = futures.toArray(new CompletableFuture[0]);
CompletableFuture.anyOf(futureArray)
.thenAccept(result -> {
System.out.println("First order completed: " + result);
});
// Wait for all
CompletableFuture.allOf(futureArray).join();
}
}
private UserProfile fetchUserProfile(String userId) {
// Simulate API call
try {
Thread.sleep(100);
return new UserProfile(userId, "User-" + userId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
private OrderResult processOrder(Order order) {
// Simulate order processing
try {
Thread.sleep(200);
return new OrderResult(order.id(), "PROCESSED");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
record UserProfile(String id, String name) {}
record Order(String id, double amount) {}
record OrderResult(String orderId, String status) {}
}

4. I/O Operations with Virtual Threads

Non-blocking I/O Patterns

public class VirtualThreadsIO {
public CompletableFuture<List<String>> fetchMultipleUrls(List<String> urls) {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<String>> futures = urls.stream()
.map(url -> CompletableFuture.supplyAsync(() -> 
fetchUrl(url), executor))
.toList();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList());
}
}
private String fetchUrl(String url) {
// Simulate HTTP request
try {
Thread.sleep(100 + (long) (Math.random() * 100));
return "Response from " + url;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
public void databaseOperations() throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<Void>> operations = new ArrayList<>();
// Simulate multiple database operations
for (int i = 0; i < 1000; i++) {
int recordId = i;
CompletableFuture<Void> operation = CompletableFuture.runAsync(() -> {
processDatabaseRecord(recordId);
}, executor);
operations.add(operation);
}
// Wait for all operations to complete
CompletableFuture.allOf(operations.toArray(new CompletableFuture[0])).join();
System.out.println("All database operations completed");
}
}
private void processDatabaseRecord(int recordId) {
// Simulate database operation
try {
Thread.sleep(50);
System.out.println("Processed record: " + recordId + " on " + 
Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

File Processing Example

public class FileProcessingWithVirtualThreads {
public void processFilesConcurrently(List<java.nio.file.Path> files) throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<FileProcessResult>> futures = files.stream()
.map(file -> CompletableFuture.supplyAsync(() -> 
processFile(file), executor))
.toList();
List<FileProcessResult> results = futures.stream()
.map(CompletableFuture::join)
.toList();
System.out.println("Processed " + results.size() + " files");
}
}
private FileProcessResult processFile(java.nio.file.Path file) {
try {
// Simulate file processing
Thread.sleep(100);
long size = Files.size(file);
return new FileProcessResult(file.getFileName().toString(), size, "SUCCESS");
} catch (Exception e) {
return new FileProcessResult(file.getFileName().toString(), 0, "ERROR: " + e.getMessage());
}
}
public void batchFileOperations() throws Exception {
List<java.nio.file.Path> files = List.of(
java.nio.file.Path.of("file1.txt"),
java.nio.file.Path.of("file2.txt"),
java.nio.file.Path.of("file3.txt")
);
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// Read all files concurrently
List<CompletableFuture<List<String>>> readFutures = files.stream()
.map(file -> CompletableFuture.supplyAsync(() -> 
readFileLines(file), executor))
.toList();
// Process results as they complete
for (CompletableFuture<List<String>> future : readFutures) {
List<String> lines = future.get();
System.out.println("Read " + lines.size() + " lines");
}
}
}
private List<String> readFileLines(java.nio.file.Path file) {
try {
Thread.sleep(50); // Simulate I/O
return Files.readAllLines(file);
} catch (Exception e) {
return List.of();
}
}
record FileProcessResult(String filename, long size, String status) {}
}

5. Error Handling and Cancellation

Proper Error Handling

public class VirtualThreadsErrorHandling {
public void demonstrateErrorHandling() throws Exception {
System.out.println("=== Error Handling in Virtual Threads ===");
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
int taskId = i;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (taskId == 2) {
throw new RuntimeException("Simulated failure in task " + taskId);
}
return "Task-" + taskId + " succeeded";
}, executor)
.exceptionally(ex -> "Task failed: " + ex.getMessage());
futures.add(future);
}
// Collect results including failures
for (CompletableFuture<String> future : futures) {
System.out.println(future.get());
}
}
}
public void handleTimeouts() throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000); // Long operation
return "Result";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Interrupted";
}
}, executor);
try {
String result = future.get(1, TimeUnit.SECONDS); // Timeout after 1 second
System.out.println("Got result: " + result);
} catch (TimeoutException e) {
System.out.println("Task timed out");
future.cancel(true); // Cancel the task
}
}
}
public void resourceCleanup() throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<Void> task = CompletableFuture.runAsync(() -> {
try (var resource = new AutoCloseableResource()) {
resource.use();
Thread.sleep(100);
} catch (Exception e) {
Thread.currentThread().interrupt();
}
}, executor);
task.get(); // Wait for completion with proper resource cleanup
}
}
static class AutoCloseableResource implements AutoCloseable {
public void use() {
System.out.println("Using resource in: " + Thread.currentThread().getName());
}
@Override
public void close() {
System.out.println("Closing resource from: " + Thread.currentThread().getName());
}
}
}

6. Synchronization and Coordination

Structured Concurrency

public class StructuredConcurrencyExample {
public UserProfile fetchUserData(String userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Fork subtasks
StructuredTaskScope.Subtask<String> userSubtask = scope.fork(() -> 
fetchUserDetails(userId));
StructuredTaskScope.Subtask<List<String>> ordersSubtask = scope.fork(() -> 
fetchUserOrders(userId));
StructuredTaskScope.Subtask<List<String>> preferencesSubtask = scope.fork(() -> 
fetchUserPreferences(userId));
// Wait for all subtasks to complete or any to fail
scope.join();
scope.throwIfFailed(); // Propagate any exception
// Combine results
return new UserProfile(
userSubtask.get(),
ordersSubtask.get(),
preferencesSubtask.get()
);
}
}
public UserProfile fetchUserDataWithTimeout(String userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
StructuredTaskScope.Subtask<String> userSubtask = scope.fork(() -> 
fetchUserDetails(userId));
StructuredTaskScope.Subtask<List<String>> ordersSubtask = scope.fork(() -> 
fetchUserOrders(userId));
// Wait with timeout
scope.joinUntil(Instant.now().plusSeconds(5));
if (userSubtask.state() == StructuredTaskScope.Subtask.State.SUCCESS &&
ordersSubtask.state() == StructuredTaskScope.Subtask.State.SUCCESS) {
return new UserProfile(userSubtask.get(), ordersSubtask.get(), List.of());
} else {
throw new TimeoutException("Failed to fetch user data within timeout");
}
}
}
private String fetchUserDetails(String userId) {
try {
Thread.sleep(100);
return "User details for " + userId;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
private List<String> fetchUserOrders(String userId) {
try {
Thread.sleep(200);
return List.of("Order1", "Order2");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
private List<String> fetchUserPreferences(String userId) {
try {
Thread.sleep(150);
return List.of("Pref1", "Pref2");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
record UserProfile(String details, List<String> orders, List<String> preferences) {}
}

7. Performance and Best Practices

Performance Comparison

public class VirtualThreadsPerformance {
public static void main(String[] args) throws Exception {
int taskCount = 10_000;
System.out.println("=== Performance Comparison ===");
// Platform threads
long platformTime = measurePlatformThreads(taskCount);
System.out.printf("Platform threads: %d ms%n", platformTime);
// Virtual threads
long virtualTime = measureVirtualThreads(taskCount);
System.out.printf("Virtual threads: %d ms%n", virtualTime);
System.out.printf("Improvement: %.2fx faster%n", (double) platformTime / virtualTime);
}
private static long measurePlatformThreads(int taskCount) throws Exception {
long startTime = System.currentTimeMillis();
try (var executor = Executors.newFixedThreadPool(200)) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(10); // Simulate I/O
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executor);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
return System.currentTimeMillis() - startTime;
}
private static long measureVirtualThreads(int taskCount) throws Exception {
long startTime = System.currentTimeMillis();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(10); // Simulate I/O
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executor);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
return System.currentTimeMillis() - startTime;
}
}

Best Practices

public class VirtualThreadsBestPractices {
// ✅ DO: Use for I/O-bound tasks
public void ioBoundOperations() throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<String>> futures = List.of(
CompletableFuture.supplyAsync(() -> fetchFromDatabase("query1"), executor),
CompletableFuture.supplyAsync(() -> callExternalApi("api1"), executor),
CompletableFuture.supplyAsync(() -> readFromFile("file1"), executor)
);
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
}
// ❌ DON'T: Use for CPU-bound tasks
public void cpuBoundOperations() {
// Virtual threads don't help with CPU-bound work
// Use platform threads or ForkJoinPool instead
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
futures.add(CompletableFuture.runAsync(() -> {
intensiveComputation(); // CPU-bound
}));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
// ✅ DO: Use structured concurrency for related tasks
public void structuredApproach(String userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var userTask = scope.fork(() -> fetchUser(userId));
var profileTask = scope.fork(() -> fetchProfile(userId));
var settingsTask = scope.fork(() -> fetchSettings(userId));
scope.join();
scope.throwIfFailed();
return combineData(userTask.get(), profileTask.get(), settingsTask.get());
}
}
// ✅ DO: Proper resource cleanup
public void withResources() throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture.runAsync(() -> {
try (var connection = getDatabaseConnection();
var statement = connection.createStatement()) {
// Use resources
processData(statement);
} catch (Exception e) {
throw new RuntimeException(e);
}
}, executor).join();
}
}
private String fetchFromDatabase(String query) {
try { Thread.sleep(100); return "result"; } 
catch (InterruptedException e) { throw new RuntimeException(e); }
}
private String callExternalApi(String api) {
try { Thread.sleep(200); return "response"; } 
catch (InterruptedException e) { throw new RuntimeException(e); }
}
private String readFromFile(String file) {
try { Thread.sleep(50); return "content"; } 
catch (InterruptedException e) { throw new RuntimeException(e); }
}
private void intensiveComputation() {
// CPU-intensive work
long result = 0;
for (long i = 0; i < 1_000_000_000; i++) {
result += i;
}
}
// Placeholder methods
private Object getDatabaseConnection() { return null; }
private void processData(Object statement) {}
private String fetchUser(String userId) { return null; }
private String fetchProfile(String userId) { return null; }
private String fetchSettings(String userId) { return null; }
private Object combineData(String user, String profile, String settings) { return null; }
}

8. Migration from Traditional Threading

Before and After Examples

public class MigrationExamples {
// Before: Traditional thread pool for I/O
public void oldStyleIoProcessing(List<String> urls) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(100);
try {
List<Future<String>> futures = new ArrayList<>();
for (String url : urls) {
futures.add(executor.submit(() -> {
// I/O operation
return fetchUrl(url);
}));
}
for (Future<String> future : futures) {
String result = future.get();
processResult(result);
}
} finally {
executor.shutdown();
}
}
// After: Virtual threads for I/O
public void newStyleIoProcessing(List<String> urls) throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<String>> futures = urls.stream()
.map(url -> CompletableFuture.supplyAsync(() -> 
fetchUrl(url), executor))
.toList();
futures.forEach(future -> {
try {
processResult(future.get());
} catch (Exception e) {
// Handle exception
}
});
}
}
// Before: Complex async handling
public void oldStyleAsyncOperations() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
// Submit tasks
for (int i = 0; i < 10; i++) {
int taskId = i;
completionService.submit(() -> "Result-" + taskId);
}
// Process results
for (int i = 0; i < 10; i++) {
Future<String> future = completionService.take();
String result = future.get();
System.out.println(result);
}
executor.shutdown();
}
// After: Simplified with virtual threads
public void newStyleAsyncOperations() throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int taskId = i;
futures.add(CompletableFuture.supplyAsync(() -> 
"Result-" + taskId, executor));
}
// Process as they complete
for (CompletableFuture<String> future : futures) {
System.out.println(future.get());
}
}
}
private String fetchUrl(String url) {
try { Thread.sleep(100); return "data"; } 
catch (InterruptedException e) { throw new RuntimeException(e); }
}
private void processResult(String result) {
// Process the result
}
}

Key Benefits Summary

FeatureBenefitUse Case
LightweightMillions of threads possibleHigh-concurrency I/O
Simplified CodeNo complex pooling neededWeb servers, microservices
Better Resource UsageEfficient I/O waitingDatabase clients, API calls
Structured ConcurrencyClean task relationshipsComplex async workflows
Backward CompatibleWorks with existing codeGradual migration

Virtual Threads in Java 21 represent a fundamental shift in Java concurrency, making it much easier to write, maintain, and scale concurrent applications, particularly those that are I/O-bound.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper