Comprehensive guide to Virtual Threads introduced in Java 21 as part of Project Loom, revolutionizing concurrent programming.
1. Virtual Threads Overview
Traditional vs Virtual Threads
public class VirtualThreadsOverview {
public static void main(String[] args) {
System.out.println("=== Virtual Threads vs Platform Threads ===");
// Platform Thread (traditional)
Thread platformThread = new Thread(() -> {
System.out.println("Platform Thread: " + Thread.currentThread());
System.out.println("Is Virtual: " + Thread.currentThread().isVirtual());
});
// Virtual Thread (new in Java 21)
Thread virtualThread = Thread.ofVirtual().start(() -> {
System.out.println("Virtual Thread: " + Thread.currentThread());
System.out.println("Is Virtual: " + Thread.currentThread().isVirtual());
});
try {
platformThread.join();
virtualThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Key Characteristics
public class VirtualThreadCharacteristics {
public static void demonstrateCharacteristics() {
System.out.println("=== Virtual Threads Characteristics ===");
// 1. Lightweight - can create millions
Runnable task = () -> {
System.out.println("Executing in: " + Thread.currentThread());
System.out.println("Is Virtual: " + Thread.currentThread().isVirtual());
System.out.println("Thread ID: " + Thread.currentThread().threadId());
System.out.println("Is Daemon: " + Thread.currentThread().isDaemon());
};
// Create multiple virtual threads
for (int i = 0; i < 5; i++) {
Thread virtualThread = Thread.ofVirtual().start(task);
}
// Virtual threads are always daemon threads
Thread virtualThread = Thread.ofVirtual().unstarted(task);
System.out.println("Before start - Is Daemon: " + virtualThread.isDaemon());
virtualThread.start();
}
}
2. Creating Virtual Threads
Different Creation Methods
public class VirtualThreadCreation {
public static void main(String[] args) throws Exception {
System.out.println("=== Virtual Thread Creation Methods ===");
// Method 1: Thread.ofVirtual().start()
Thread vThread1 = Thread.ofVirtual().start(() -> {
System.out.println("Method 1 - Direct start: " + Thread.currentThread().getName());
});
// Method 2: Thread.ofVirtual().unstarted() + start()
Thread vThread2 = Thread.ofVirtual()
.name("custom-virtual-thread-1")
.unstarted(() -> {
System.out.println("Method 2 - Unstarted: " + Thread.currentThread().getName());
});
vThread2.start();
// Method 3: Thread.startVirtualThread() (convenience method)
Thread vThread3 = Thread.startVirtualThread(() -> {
System.out.println("Method 3 - Start virtual: " + Thread.currentThread().getName());
});
// Method 4: Using Executors
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> {
System.out.println("Method 4 - Executor: " + Thread.currentThread().getName());
});
}
vThread1.join();
vThread2.join();
vThread3.join();
}
}
Builder Pattern for Configuration
public class VirtualThreadBuilder {
public static void demonstrateBuilder() {
System.out.println("=== Virtual Thread Builder ===");
// Configure virtual thread with builder
Thread.Builder builder = Thread.ofVirtual()
.name("worker-", 0) // name pattern and start index
.inheritInheritableThreadLocals(false); // control inheritance
// Create multiple threads with sequential names
for (int i = 0; i < 3; i++) {
Thread virtualThread = builder.start(() -> {
System.out.println("Thread: " + Thread.currentThread().getName() +
", Virtual: " + Thread.currentThread().isVirtual());
});
}
// Factory method for unstarted threads
Thread unstarted = Thread.ofVirtual()
.name("unstarted-worker")
.unstarted(() -> {
System.out.println("This won't print until started");
});
System.out.println("Created unstarted thread: " + unstarted.getName());
unstarted.start(); // Now it will execute
}
}
3. Executors with Virtual Threads
Virtual Thread Executors
public class VirtualThreadExecutors {
public static void main(String[] args) throws Exception {
System.out.println("=== Virtual Thread Executors ===");
// Method 1: newVirtualThreadPerTaskExecutor (recommended)
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int taskId = i;
Future<String> future = executor.submit(() -> {
Thread.sleep(100); // Simulate work
return "Task-" + taskId + " completed by " +
Thread.currentThread().getName();
});
futures.add(future);
}
// Collect results
for (Future<String> future : futures) {
System.out.println(future.get());
}
}
// Method 2: newThreadPerTaskExecutor with virtual thread factory
ThreadFactory virtualThreadFactory = Thread.ofVirtual().factory();
try (var executor = Executors.newThreadPerTaskExecutor(virtualThreadFactory)) {
executor.submit(() -> {
System.out.println("Using custom factory: " + Thread.currentThread().getName());
});
}
}
}
ExecutorService Patterns
public class ExecutorServicePatterns {
public void processUsers(List<String> userIds) throws Exception {
System.out.println("=== Processing with Virtual Threads ===");
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// Submit all tasks
List<CompletableFuture<UserProfile>> futures = userIds.stream()
.map(userId -> CompletableFuture.supplyAsync(() ->
fetchUserProfile(userId), executor))
.toList();
// Wait for all completions
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
// Get results
List<UserProfile> profiles = allFutures
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList())
.join();
System.out.println("Processed " + profiles.size() + " users");
}
}
public void batchProcessing(List<Order> orders) throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<OrderResult>> futures = orders.stream()
.map(order -> CompletableFuture.supplyAsync(() ->
processOrder(order), executor))
.toList();
// Process as they complete
CompletableFuture<?>[] futureArray = futures.toArray(new CompletableFuture[0]);
CompletableFuture.anyOf(futureArray)
.thenAccept(result -> {
System.out.println("First order completed: " + result);
});
// Wait for all
CompletableFuture.allOf(futureArray).join();
}
}
private UserProfile fetchUserProfile(String userId) {
// Simulate API call
try {
Thread.sleep(100);
return new UserProfile(userId, "User-" + userId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
private OrderResult processOrder(Order order) {
// Simulate order processing
try {
Thread.sleep(200);
return new OrderResult(order.id(), "PROCESSED");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
record UserProfile(String id, String name) {}
record Order(String id, double amount) {}
record OrderResult(String orderId, String status) {}
}
4. I/O Operations with Virtual Threads
Non-blocking I/O Patterns
public class VirtualThreadsIO {
public CompletableFuture<List<String>> fetchMultipleUrls(List<String> urls) {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<String>> futures = urls.stream()
.map(url -> CompletableFuture.supplyAsync(() ->
fetchUrl(url), executor))
.toList();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList());
}
}
private String fetchUrl(String url) {
// Simulate HTTP request
try {
Thread.sleep(100 + (long) (Math.random() * 100));
return "Response from " + url;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
public void databaseOperations() throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<Void>> operations = new ArrayList<>();
// Simulate multiple database operations
for (int i = 0; i < 1000; i++) {
int recordId = i;
CompletableFuture<Void> operation = CompletableFuture.runAsync(() -> {
processDatabaseRecord(recordId);
}, executor);
operations.add(operation);
}
// Wait for all operations to complete
CompletableFuture.allOf(operations.toArray(new CompletableFuture[0])).join();
System.out.println("All database operations completed");
}
}
private void processDatabaseRecord(int recordId) {
// Simulate database operation
try {
Thread.sleep(50);
System.out.println("Processed record: " + recordId + " on " +
Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
File Processing Example
public class FileProcessingWithVirtualThreads {
public void processFilesConcurrently(List<java.nio.file.Path> files) throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<FileProcessResult>> futures = files.stream()
.map(file -> CompletableFuture.supplyAsync(() ->
processFile(file), executor))
.toList();
List<FileProcessResult> results = futures.stream()
.map(CompletableFuture::join)
.toList();
System.out.println("Processed " + results.size() + " files");
}
}
private FileProcessResult processFile(java.nio.file.Path file) {
try {
// Simulate file processing
Thread.sleep(100);
long size = Files.size(file);
return new FileProcessResult(file.getFileName().toString(), size, "SUCCESS");
} catch (Exception e) {
return new FileProcessResult(file.getFileName().toString(), 0, "ERROR: " + e.getMessage());
}
}
public void batchFileOperations() throws Exception {
List<java.nio.file.Path> files = List.of(
java.nio.file.Path.of("file1.txt"),
java.nio.file.Path.of("file2.txt"),
java.nio.file.Path.of("file3.txt")
);
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// Read all files concurrently
List<CompletableFuture<List<String>>> readFutures = files.stream()
.map(file -> CompletableFuture.supplyAsync(() ->
readFileLines(file), executor))
.toList();
// Process results as they complete
for (CompletableFuture<List<String>> future : readFutures) {
List<String> lines = future.get();
System.out.println("Read " + lines.size() + " lines");
}
}
}
private List<String> readFileLines(java.nio.file.Path file) {
try {
Thread.sleep(50); // Simulate I/O
return Files.readAllLines(file);
} catch (Exception e) {
return List.of();
}
}
record FileProcessResult(String filename, long size, String status) {}
}
5. Error Handling and Cancellation
Proper Error Handling
public class VirtualThreadsErrorHandling {
public void demonstrateErrorHandling() throws Exception {
System.out.println("=== Error Handling in Virtual Threads ===");
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
int taskId = i;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (taskId == 2) {
throw new RuntimeException("Simulated failure in task " + taskId);
}
return "Task-" + taskId + " succeeded";
}, executor)
.exceptionally(ex -> "Task failed: " + ex.getMessage());
futures.add(future);
}
// Collect results including failures
for (CompletableFuture<String> future : futures) {
System.out.println(future.get());
}
}
}
public void handleTimeouts() throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000); // Long operation
return "Result";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Interrupted";
}
}, executor);
try {
String result = future.get(1, TimeUnit.SECONDS); // Timeout after 1 second
System.out.println("Got result: " + result);
} catch (TimeoutException e) {
System.out.println("Task timed out");
future.cancel(true); // Cancel the task
}
}
}
public void resourceCleanup() throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<Void> task = CompletableFuture.runAsync(() -> {
try (var resource = new AutoCloseableResource()) {
resource.use();
Thread.sleep(100);
} catch (Exception e) {
Thread.currentThread().interrupt();
}
}, executor);
task.get(); // Wait for completion with proper resource cleanup
}
}
static class AutoCloseableResource implements AutoCloseable {
public void use() {
System.out.println("Using resource in: " + Thread.currentThread().getName());
}
@Override
public void close() {
System.out.println("Closing resource from: " + Thread.currentThread().getName());
}
}
}
6. Synchronization and Coordination
Structured Concurrency
public class StructuredConcurrencyExample {
public UserProfile fetchUserData(String userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Fork subtasks
StructuredTaskScope.Subtask<String> userSubtask = scope.fork(() ->
fetchUserDetails(userId));
StructuredTaskScope.Subtask<List<String>> ordersSubtask = scope.fork(() ->
fetchUserOrders(userId));
StructuredTaskScope.Subtask<List<String>> preferencesSubtask = scope.fork(() ->
fetchUserPreferences(userId));
// Wait for all subtasks to complete or any to fail
scope.join();
scope.throwIfFailed(); // Propagate any exception
// Combine results
return new UserProfile(
userSubtask.get(),
ordersSubtask.get(),
preferencesSubtask.get()
);
}
}
public UserProfile fetchUserDataWithTimeout(String userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
StructuredTaskScope.Subtask<String> userSubtask = scope.fork(() ->
fetchUserDetails(userId));
StructuredTaskScope.Subtask<List<String>> ordersSubtask = scope.fork(() ->
fetchUserOrders(userId));
// Wait with timeout
scope.joinUntil(Instant.now().plusSeconds(5));
if (userSubtask.state() == StructuredTaskScope.Subtask.State.SUCCESS &&
ordersSubtask.state() == StructuredTaskScope.Subtask.State.SUCCESS) {
return new UserProfile(userSubtask.get(), ordersSubtask.get(), List.of());
} else {
throw new TimeoutException("Failed to fetch user data within timeout");
}
}
}
private String fetchUserDetails(String userId) {
try {
Thread.sleep(100);
return "User details for " + userId;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
private List<String> fetchUserOrders(String userId) {
try {
Thread.sleep(200);
return List.of("Order1", "Order2");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
private List<String> fetchUserPreferences(String userId) {
try {
Thread.sleep(150);
return List.of("Pref1", "Pref2");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
record UserProfile(String details, List<String> orders, List<String> preferences) {}
}
7. Performance and Best Practices
Performance Comparison
public class VirtualThreadsPerformance {
public static void main(String[] args) throws Exception {
int taskCount = 10_000;
System.out.println("=== Performance Comparison ===");
// Platform threads
long platformTime = measurePlatformThreads(taskCount);
System.out.printf("Platform threads: %d ms%n", platformTime);
// Virtual threads
long virtualTime = measureVirtualThreads(taskCount);
System.out.printf("Virtual threads: %d ms%n", virtualTime);
System.out.printf("Improvement: %.2fx faster%n", (double) platformTime / virtualTime);
}
private static long measurePlatformThreads(int taskCount) throws Exception {
long startTime = System.currentTimeMillis();
try (var executor = Executors.newFixedThreadPool(200)) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(10); // Simulate I/O
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executor);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
return System.currentTimeMillis() - startTime;
}
private static long measureVirtualThreads(int taskCount) throws Exception {
long startTime = System.currentTimeMillis();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(10); // Simulate I/O
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executor);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
return System.currentTimeMillis() - startTime;
}
}
Best Practices
public class VirtualThreadsBestPractices {
// ✅ DO: Use for I/O-bound tasks
public void ioBoundOperations() throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<String>> futures = List.of(
CompletableFuture.supplyAsync(() -> fetchFromDatabase("query1"), executor),
CompletableFuture.supplyAsync(() -> callExternalApi("api1"), executor),
CompletableFuture.supplyAsync(() -> readFromFile("file1"), executor)
);
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
}
// ❌ DON'T: Use for CPU-bound tasks
public void cpuBoundOperations() {
// Virtual threads don't help with CPU-bound work
// Use platform threads or ForkJoinPool instead
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
futures.add(CompletableFuture.runAsync(() -> {
intensiveComputation(); // CPU-bound
}));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
// ✅ DO: Use structured concurrency for related tasks
public void structuredApproach(String userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var userTask = scope.fork(() -> fetchUser(userId));
var profileTask = scope.fork(() -> fetchProfile(userId));
var settingsTask = scope.fork(() -> fetchSettings(userId));
scope.join();
scope.throwIfFailed();
return combineData(userTask.get(), profileTask.get(), settingsTask.get());
}
}
// ✅ DO: Proper resource cleanup
public void withResources() throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture.runAsync(() -> {
try (var connection = getDatabaseConnection();
var statement = connection.createStatement()) {
// Use resources
processData(statement);
} catch (Exception e) {
throw new RuntimeException(e);
}
}, executor).join();
}
}
private String fetchFromDatabase(String query) {
try { Thread.sleep(100); return "result"; }
catch (InterruptedException e) { throw new RuntimeException(e); }
}
private String callExternalApi(String api) {
try { Thread.sleep(200); return "response"; }
catch (InterruptedException e) { throw new RuntimeException(e); }
}
private String readFromFile(String file) {
try { Thread.sleep(50); return "content"; }
catch (InterruptedException e) { throw new RuntimeException(e); }
}
private void intensiveComputation() {
// CPU-intensive work
long result = 0;
for (long i = 0; i < 1_000_000_000; i++) {
result += i;
}
}
// Placeholder methods
private Object getDatabaseConnection() { return null; }
private void processData(Object statement) {}
private String fetchUser(String userId) { return null; }
private String fetchProfile(String userId) { return null; }
private String fetchSettings(String userId) { return null; }
private Object combineData(String user, String profile, String settings) { return null; }
}
8. Migration from Traditional Threading
Before and After Examples
public class MigrationExamples {
// Before: Traditional thread pool for I/O
public void oldStyleIoProcessing(List<String> urls) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(100);
try {
List<Future<String>> futures = new ArrayList<>();
for (String url : urls) {
futures.add(executor.submit(() -> {
// I/O operation
return fetchUrl(url);
}));
}
for (Future<String> future : futures) {
String result = future.get();
processResult(result);
}
} finally {
executor.shutdown();
}
}
// After: Virtual threads for I/O
public void newStyleIoProcessing(List<String> urls) throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<String>> futures = urls.stream()
.map(url -> CompletableFuture.supplyAsync(() ->
fetchUrl(url), executor))
.toList();
futures.forEach(future -> {
try {
processResult(future.get());
} catch (Exception e) {
// Handle exception
}
});
}
}
// Before: Complex async handling
public void oldStyleAsyncOperations() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
// Submit tasks
for (int i = 0; i < 10; i++) {
int taskId = i;
completionService.submit(() -> "Result-" + taskId);
}
// Process results
for (int i = 0; i < 10; i++) {
Future<String> future = completionService.take();
String result = future.get();
System.out.println(result);
}
executor.shutdown();
}
// After: Simplified with virtual threads
public void newStyleAsyncOperations() throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int taskId = i;
futures.add(CompletableFuture.supplyAsync(() ->
"Result-" + taskId, executor));
}
// Process as they complete
for (CompletableFuture<String> future : futures) {
System.out.println(future.get());
}
}
}
private String fetchUrl(String url) {
try { Thread.sleep(100); return "data"; }
catch (InterruptedException e) { throw new RuntimeException(e); }
}
private void processResult(String result) {
// Process the result
}
}
Key Benefits Summary
| Feature | Benefit | Use Case |
|---|---|---|
| Lightweight | Millions of threads possible | High-concurrency I/O |
| Simplified Code | No complex pooling needed | Web servers, microservices |
| Better Resource Usage | Efficient I/O waiting | Database clients, API calls |
| Structured Concurrency | Clean task relationships | Complex async workflows |
| Backward Compatible | Works with existing code | Gradual migration |
Virtual Threads in Java 21 represent a fundamental shift in Java concurrency, making it much easier to write, maintain, and scale concurrent applications, particularly those that are I/O-bound.