Pinning Virtual Threads to Carriers in Java

Introduction

Virtual Threads in Java 21+ introduce a lightweight concurrency model, but understanding carrier thread pinning is crucial for optimal performance. Pinning occurs when a virtual thread gets temporarily bound to its carrier thread, potentially reducing concurrency benefits.

Understanding Virtual Thread Architecture

Virtual Thread vs Platform Thread

public class VirtualThreadArchitecture {
public void demonstrateThreadTypes() {
// Platform thread (heavyweight)
Thread platformThread = new Thread(() -> {
System.out.println("Platform thread: " + Thread.currentThread());
});
// Virtual thread (lightweight)
Thread virtualThread = Thread.ofVirtual()
.name("virtual-thread-", 0)
.start(() -> {
System.out.println("Virtual thread: " + Thread.currentThread());
System.out.println("Is virtual: " + Thread.currentThread().isVirtual());
});
// Virtual thread with factory
ThreadFactory virtualFactory = Thread.ofVirtual().factory();
Thread virtualFromFactory = virtualFactory.newThread(() -> {
System.out.println("From factory: " + Thread.currentThread());
});
virtualFromFactory.start();
}
}

What is Carrier Pinning?

The Pinning Mechanism

public class CarrierPinningExplanation {
// Carrier threads are platform threads from ForkJoinPool that execute virtual threads
public void demonstratePinningScenarios() {
// Scenario 1: Synchronized block - causes pinning
Thread virtualThread1 = Thread.ofVirtual().start(() -> {
synchronized (this) {  // This will pin the virtual thread
System.out.println("Inside synchronized - pinned: " + 
Thread.currentThread().isVirtual());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
// Scenario 2: Native method calls - may cause pinning
Thread virtualThread2 = Thread.ofVirtual().start(() -> {
System.out.println("Before native call");
System.out.println("After native call - still virtual: " + 
Thread.currentThread().isVirtual());
});
// Scenario 3: File I/O operations - may cause pinning
Thread virtualThread3 = Thread.ofVirtual().start(() -> {
try {
Files.readAllLines(Path.of("test.txt"));  // May cause pinning
System.out.println("After file I/O: " + Thread.currentThread().isVirtual());
} catch (IOException e) {
e.printStackTrace();
}
});
}
}

Detecting and Monitoring Pinning

Pinning Detection Tools

public class PinningDetection {
// JVM options to monitor pinning:
// -Djdk.tracePinnedThreads=full
// -Djdk.tracePinnedThreads=short
public void demonstratePinningDetection() {
Thread virtualThread = Thread.ofVirtual().start(() -> {
// This synchronized block will trigger pinning warning
synchronized (this) {
try {
System.out.println("Working in pinned state...");
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
try {
virtualThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// Programmatic pinning monitoring
public void monitorPinningProgrammatically() {
Thread virtualThread = Thread.ofVirtual()
.unstarted(() -> performBlockingOperation());
// Add uncaught exception handler to detect issues
virtualThread.setUncaughtExceptionHandler((thread, throwable) -> {
System.err.println("Virtual thread error: " + throwable.getMessage());
});
virtualThread.start();
}
private void performBlockingOperation() {
synchronized (this) {
System.out.println("Carrier thread: " + Thread.currentThread());
System.out.println("Is virtual: " + Thread.currentThread().isVirtual());
// Check if we're pinned
if (isPinned()) {
System.out.println("WARNING: Virtual thread is pinned to carrier!");
}
}
}
private boolean isPinned() {
// This is a simplified check - actual pinning detection
// requires more sophisticated monitoring
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
for (StackTraceElement element : stackTrace) {
if (element.getMethodName().contains("synchronized")) {
return true;
}
}
return false;
}
}

Avoiding Pinning with Best Practices

Replacing Synchronized with ReentrantLock

public class PinningAvoidance {
private final ReentrantLock lock = new ReentrantLock();
private final AtomicInteger counter = new AtomicInteger(0);
public void demonstrateLockInsteadOfSynchronized() {
Thread virtualThread = Thread.ofVirtual().start(() -> {
// Using ReentrantLock instead of synchronized avoids pinning
lock.lock();
try {
System.out.println("Lock acquired without pinning: " + 
Thread.currentThread().isVirtual());
// Simulate work
int current = counter.incrementAndGet();
System.out.println("Counter: " + current);
// This won't pin the virtual thread
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} finally {
lock.unlock();
}
});
}
// Using Semaphore for resource control
private final Semaphore semaphore = new Semaphore(5);
public void demonstrateSemaphoreUsage() {
for (int i = 0; i < 10; i++) {
Thread virtualThread = Thread.ofVirtual().start(() -> {
try {
semaphore.acquire();
try {
System.out.println("Semaphore acquired by: " + 
Thread.currentThread().getName());
// Work without pinning
processItem();
} finally {
semaphore.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
private void processItem() {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

Async I/O Operations

public class AsyncIOOperations {
public void demonstrateAsyncFileOperations() {
Thread virtualThread = Thread.ofVirtual().start(() -> {
try {
// Using async file operations to avoid pinning
CompletableFuture<String> fileContent = readFileAsync("data.txt");
fileContent.thenAccept(content -> {
System.out.println("File content length: " + content.length());
System.out.println("Still virtual after async I/O: " + 
Thread.currentThread().isVirtual());
}).join(); // Be careful with join in virtual threads
} catch (Exception e) {
e.printStackTrace();
}
});
}
private CompletableFuture<String> readFileAsync(String filename) {
return CompletableFuture.supplyAsync(() -> {
try {
return Files.readString(Path.of(filename));
} catch (IOException e) {
throw new RuntimeException("Failed to read file", e);
}
});
}
// Using HttpClient with virtual threads
public void demonstrateAsyncHttpClient() {
HttpClient client = HttpClient.newBuilder()
.executor(Executors.newVirtualThreadPerTaskExecutor())
.build();
for (int i = 0; i < 10; i++) {
Thread virtualThread = Thread.ofVirtual().start(() -> {
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://httpbin.org/delay/1"))
.build();
HttpResponse<String> response = client.send(
request, HttpResponse.BodyHandlers.ofString());
System.out.println("Response status: " + response.statusCode() +
" from virtual thread: " + Thread.currentThread().isVirtual());
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}

Advanced Pinning Management

Custom Executor with Pinning Awareness

public class PinningAwareExecutor {
private final ExecService executorService;
public PinningAwareExecutor() {
this.executorService = new ExecService();
}
public <T> CompletableFuture<T> submit(Callable<T> task) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, executorService);
}
// Custom executor service that monitors pinning
private static class ExecService implements Executor {
private final ThreadFactory virtualThreadFactory = 
Thread.ofVirtual().factory();
@Override
public void execute(Runnable command) {
Thread virtualThread = virtualThreadFactory.newThread(() -> {
long startTime = System.nanoTime();
try {
command.run();
} finally {
long duration = System.nanoTime() - startTime;
// Log long-running operations that might indicate pinning
if (duration > 100_000_000L) { // 100ms threshold
System.out.println("Long operation detected: " + 
duration + " ns in thread: " + 
Thread.currentThread().getName());
}
}
});
virtualThread.start();
}
}
public void demonstratePinningAwareExecution() {
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
final int taskId = i;
CompletableFuture<String> future = submit(() -> {
if (taskId % 2 == 0) {
// Simulate blocking operation that might cause pinning
synchronized (this) {
Thread.sleep(100);
return "Pinned task " + taskId;
}
} else {
// Non-blocking operation
Thread.sleep(50);
return "Non-pinned task " + taskId;
}
});
futures.add(future);
}
// Wait for all tasks
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> System.out.println("All tasks completed"))
.join();
}
}

Thread Local Management

public class ThreadLocalManagement {
// Scoped values (preferred over ThreadLocal for virtual threads)
private static final ScopedValue<String> USER_CONTEXT = ScopedValue.newInstance();
public void demonstrateScopedValues() {
// Using ScopedValue instead of ThreadLocal to avoid pinning
ScopedValue.where(USER_CONTEXT, "user123")
.run(() -> {
Thread virtualThread = Thread.ofVirtual().start(() -> {
String user = USER_CONTEXT.get();
System.out.println("User context: " + user + 
" in virtual thread: " + Thread.currentThread().isVirtual());
});
try {
virtualThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// If you must use ThreadLocal, be cautious
private static final ThreadLocal<String> LEGACY_THREAD_LOCAL = new ThreadLocal<>();
public void demonstrateThreadLocalWithVirtualThreads() {
Thread virtualThread = Thread.ofVirtual().start(() -> {
try {
LEGACY_THREAD_LOCAL.set("virtual-thread-data");
// Perform work
System.out.println("ThreadLocal value: " + LEGACY_THREAD_LOCAL.get());
System.out.println("Is virtual: " + Thread.currentThread().isVirtual());
} finally {
// Always clean up ThreadLocal to prevent memory leaks
LEGACY_THREAD_LOCAL.remove();
}
});
}
}

Performance Testing and Benchmarking

Pinning Impact Analysis

public class PinningBenchmark {
public void benchmarkPinnedVsNonPinned() throws InterruptedException {
int taskCount = 1000;
CountDownLatch latch = new CountDownLatch(taskCount);
long startTime = System.currentTimeMillis();
// Test with synchronized (pinning)
for (int i = 0; i < taskCount; i++) {
Thread.ofVirtual().start(() -> {
synchronized (this) {  // Causes pinning
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
}
});
}
latch.await();
long pinnedDuration = System.currentTimeMillis() - startTime;
// Test with ReentrantLock (no pinning)
ReentrantLock lock = new ReentrantLock();
CountDownLatch latch2 = new CountDownLatch(taskCount);
startTime = System.currentTimeMillis();
for (int i = 0; i < taskCount; i++) {
Thread.ofVirtual().start(() -> {
lock.lock();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
latch2.countDown();
}
});
}
latch2.await();
long nonPinnedDuration = System.currentTimeMillis() - startTime;
System.out.println("Pinned execution time: " + pinnedDuration + "ms");
System.out.println("Non-pinned execution time: " + nonPinnedDuration + "ms");
System.out.println("Performance difference: " + 
((double) pinnedDuration / nonPinnedDuration) + "x");
}
// Memory usage comparison
public void memoryUsageComparison() {
Runtime runtime = Runtime.getRuntime();
// Test memory usage with many virtual threads
List<Thread> threads = new ArrayList<>();
long memoryBefore = runtime.totalMemory() - runtime.freeMemory();
for (int i = 0; i < 10_000; i++) {
Thread thread = Thread.ofVirtual().start(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
threads.add(thread);
}
// Wait for threads to start
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
long memoryAfter = runtime.totalMemory() - runtime.freeMemory();
long memoryUsed = memoryAfter - memoryBefore;
System.out.println("Memory used by 10,000 virtual threads: " + 
(memoryUsed / 1024 / 1024) + " MB");
// Clean up
threads.forEach(Thread::interrupt);
}
}

Real-World Use Cases and Patterns

Web Server with Virtual Threads

public class VirtualThreadWebServer {
private final Server server;
private final ExecutorService virtualThreadExecutor;
public VirtualThreadWebServer() {
this.virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();
this.server = Server.builder()
.executor(virtualThreadExecutor)
.port(8080)
.handler(this::handleRequest)
.build();
}
public void start() throws IOException {
server.start();
System.out.println("Server started on port 8080 with virtual threads");
}
private void handleRequest(HttpServerRequest request, HttpServerResponse response) {
// Each request handled by a virtual thread
try {
String path = request.getPath();
switch (path) {
case "/fast" -> handleFastEndpoint(response);
case "/slow" -> handleSlowEndpoint(response);
case "/blocking" -> handleBlockingIO(response);
default -> handleNotFound(response);
}
} catch (Exception e) {
response.setStatus(500);
response.send("Internal Server Error");
}
}
private void handleFastEndpoint(HttpServerResponse response) {
response.send("Fast response from virtual thread: " + 
Thread.currentThread().isVirtual());
}
private void handleSlowEndpoint(HttpServerResponse response) {
// Simulate CPU-bound work without pinning
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
// Intensive computation
Thread.sleep(100);
response.send("Slow response completed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
response.setStatus(500);
response.send("Request interrupted");
} finally {
lock.unlock();
}
}
private void handleBlockingIO(HttpServerResponse response) {
// Using async I/O to avoid pinning
CompletableFuture.supplyAsync(() -> {
try {
// Simulate blocking I/O
Thread.sleep(200);
return "Blocking I/O completed";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, virtualThreadExecutor).thenAccept(result -> {
response.send(result);
});
}
private void handleNotFound(HttpServerResponse response) {
response.setStatus(404);
response.send("Not Found");
}
public void stop() {
server.stop(0);
virtualThreadExecutor.close();
}
}

Database Access Patterns

public class DatabaseAccessWithVirtualThreads {
private final ExecutorService virtualThreadExecutor;
public DatabaseAccessWithVirtualThreads() {
this.virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();
}
public CompletableFuture<List<User>> getUsersConcurrently(List<String> userIds) {
List<CompletableFuture<User>> futures = userIds.stream()
.map(userId -> CompletableFuture.supplyAsync(() -> 
fetchUserFromDatabase(userId), virtualThreadExecutor))
.toList();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList());
}
private User fetchUserFromDatabase(String userId) {
// Simulate database access
try {
// Using connection pool to avoid pinning
return useConnectionPool(userId);
} catch (Exception e) {
throw new RuntimeException("Failed to fetch user: " + userId, e);
}
}
private User useConnectionPool(String userId) throws InterruptedException {
// Simulate getting connection from pool
Thread.sleep(50);
// Use ReentrantLock instead of synchronized for database operations
ReentrantLock dbLock = new ReentrantLock();
dbLock.lock();
try {
// Database operation
return new User(userId, "User " + userId);
} finally {
dbLock.unlock();
}
}
// Batch processing with virtual threads
public CompletableFuture<Void> processBatch(List<DataItem> items) {
List<CompletableFuture<Void>> futures = items.stream()
.map(item -> CompletableFuture.runAsync(() -> 
processItem(item), virtualThreadExecutor))
.toList();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
private void processItem(DataItem item) {
// Avoid synchronized blocks in processing
ReentrantLock processingLock = new ReentrantLock();
processingLock.lock();
try {
// Process item
System.out.println("Processing item: " + item.id() + 
" on virtual thread: " + Thread.currentThread().isVirtual());
Thread.sleep(10); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
processingLock.unlock();
}
}
record User(String id, String name) {}
record DataItem(String id, String content) {}
}

Best Practices Summary

  1. Avoid synchronized - Use ReentrantLock instead
  2. Use async I/O - Prefer CompletableFuture and NIO for I/O operations
  3. Monitor pinning - Use JVM flags and custom monitoring
  4. Clean ThreadLocals - Always remove ThreadLocal values or use ScopedValue
  5. Limit native calls - Be cautious with JNI and native method invocations
  6. Use proper executors - Leverage Executors.newVirtualThreadPerTaskExecutor()
  7. Test performance - Benchmark with and without potential pinning scenarios
  8. Profile applications - Use async profilers to identify pinning bottlenecks

Conclusion

Understanding and managing virtual thread pinning is essential for achieving optimal performance with Java's virtual threads. By following these patterns and best practices, developers can maximize the benefits of virtual threads while minimizing the performance impacts of carrier pinning. The key is to identify pinning scenarios early, use appropriate synchronization mechanisms, and leverage async patterns for I/O operations.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper