Structured Concurrency is a revolutionary approach to concurrent programming that treats groups of related tasks running in different threads as a single unit of work. Introduced as a preview feature in Java 21 (JEP 453) and further refined in Java 22, it brings structured programming principles to the world of concurrency, making concurrent code easier to write, read, and maintain.
The Problem with Traditional Concurrency
Traditional Java concurrency models using ExecutorService, Future, and manual thread management often lead to:
- Resource leaks from forgotten threads
- Complex error handling and propagation
- Difficulty in reasoning about task relationships
- Cancellation complexity
Key Concepts of Structured Concurrency
The Basic Principle
In structured concurrency, the lifetime of concurrent subtasks is confined to a single syntactic block, just like structured programming confines control flow to blocks.
// Traditional approach - scattered error handling
Future<String> future1 = executor.submit(task1);
Future<String> future2 = executor.submit(task2);
// What if task1 fails? Need to cancel task2 manually...
// Structured concurrency - automatic scope management
try (var scope = new StructuredTaskScope<String>()) {
Future<String> future1 = scope.fork(task1);
Future<String> future2 = scope.fork(task2);
scope.join();
// Both tasks complete or none - automatic cleanup
}
Core API Components
StructuredTaskScope
The main class that enables structured concurrency:
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.concurrent.StructuredTaskScope.ShutdownOnFailure;
import java.util.concurrent.StructuredTaskScope.ShutdownOnSuccess;
public class StructuredConcurrencyBasics {
// Basic StructuredTaskScope usage
public static void basicStructuredConcurrency() throws InterruptedException {
try (var scope = new StructuredTaskScope<String>()) {
Subtask<String> userTask = scope.fork(() -> fetchUserData());
Subtask<String> productTask = scope.fork(() -> fetchProductData());
scope.join(); // Wait for all tasks
// Check results
if (userTask.state() == Subtask.State.SUCCESS) {
System.out.println("User: " + userTask.get());
}
if (productTask.state() == Subtask.State.SUCCESS) {
System.out.println("Product: " + productTask.get());
}
}
// Scope automatically closes - all tasks are handled
}
private static String fetchUserData() throws InterruptedException {
Thread.sleep(1000);
return "User: John Doe";
}
private static String fetchProductData() throws InterruptedException {
Thread.sleep(1500);
return "Product: Laptop";
}
}
Practical Implementation Examples
Example 1: ShutdownOnFailure Pattern
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.ExecutionException;
public class ShutdownOnFailureDemo {
static class UserProfile {
final String userInfo;
final String preferences;
final String history;
UserProfile(String userInfo, String preferences, String history) {
this.userInfo = userInfo;
this.preferences = preferences;
this.history = history;
}
@Override
public String toString() {
return String.format("UserProfile[user=%s, pref=%s, history=%s]",
userInfo, preferences, history);
}
}
public static UserProfile fetchUserProfile(String userId)
throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Fork all subtasks
Subtask<String> userInfoTask = scope.fork(() -> fetchUserInfo(userId));
Subtask<String> preferencesTask = scope.fork(() -> fetchUserPreferences(userId));
Subtask<String> historyTask = scope.fork(() -> fetchUserHistory(userId));
// Wait for all tasks or any failure
scope.join();
scope.throwIfFailed(); // Throws if any task failed
// All tasks completed successfully
return new UserProfile(
userInfoTask.get(),
preferencesTask.get(),
historyTask.get()
);
}
// Automatic shutdown guaranteed
}
private static String fetchUserInfo(String userId) throws InterruptedException {
System.out.println("Fetching user info for: " + userId);
Thread.sleep(1000);
if ("error".equals(userId)) {
throw new RuntimeException("User not found");
}
return "UserInfo for " + userId;
}
private static String fetchUserPreferences(String userId) throws InterruptedException {
System.out.println("Fetching preferences for: " + userId);
Thread.sleep(1500);
return "Preferences for " + userId;
}
private static String fetchUserHistory(String userId) throws InterruptedException {
System.out.println("Fetching history for: " + userId);
Thread.sleep(2000);
return "History for " + userId;
}
public static void main(String[] args) {
try {
UserProfile profile = fetchUserProfile("user123");
System.out.println("Success: " + profile);
// Test error case
UserProfile errorProfile = fetchUserProfile("error");
} catch (Exception e) {
System.out.println("Error fetching profile: " + e.getMessage());
}
}
}
Example 2: ShutdownOnSuccess Pattern
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.ExecutionException;
public class ShutdownOnSuccessDemo {
public static String fetchFastestService(String request)
throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
// Fork multiple service calls - first one wins
scope.fork(() -> callServiceA(request));
scope.fork(() -> callServiceB(request));
scope.fork(() -> callServiceC(request));
scope.join();
return scope.result(); // Returns the first successful result
}
}
private static String callServiceA(String request) throws InterruptedException {
System.out.println("Service A processing...");
Thread.sleep(800); // Fastest
return "Result from Service A for: " + request;
}
private static String callServiceB(String request) throws InterruptedException {
System.out.println("Service B processing...");
Thread.sleep(1200);
return "Result from Service B for: " + request;
}
private static String callServiceC(String request) throws InterruptedException {
System.out.println("Service C processing...");
Thread.sleep(2000);
return "Result from Service C for: " + request;
}
public static void main(String[] args) {
try {
String result = fetchFastestService("test-request");
System.out.println("Fastest result: " + result);
} catch (Exception e) {
System.out.println("All services failed: " + e.getMessage());
}
}
}
Example 3: Error Handling and Timeouts
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.TimeoutException;
import java.time.Duration;
public class ErrorHandlingDemo {
static class ServiceResponse {
final String data;
final String source;
final boolean success;
ServiceResponse(String data, String source, boolean success) {
this.data = data;
this.source = source;
this.success = success;
}
}
public static ServiceResponse fetchWithFallback(String request)
throws InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<String> primaryTask = scope.fork(() -> callPrimaryService(request));
Subtask<String> fallbackTask = scope.fork(() -> callFallbackService(request));
try {
// Wait with timeout
scope.joinUntil(java.time.Instant.now().plusSeconds(3));
} catch (TimeoutException e) {
scope.shutdown();
return new ServiceResponse("Timeout", "Fallback", false);
}
// Check which tasks completed
if (primaryTask.state() == Subtask.State.SUCCESS) {
return new ServiceResponse(primaryTask.get(), "Primary", true);
} else if (fallbackTask.state() == Subtask.State.SUCCESS) {
return new ServiceResponse(fallbackTask.get(), "Fallback", true);
} else {
return new ServiceResponse("All services failed", "None", false);
}
}
}
private static String callPrimaryService(String request) throws InterruptedException {
System.out.println("Primary service called");
Thread.sleep(4000); // Will timeout
return "Primary result";
}
private static String callFallbackService(String request) throws InterruptedException {
System.out.println("Fallback service called");
Thread.sleep(2000);
return "Fallback result";
}
public static void main(String[] args) {
try {
ServiceResponse response = fetchWithFallback("test");
System.out.println("Response: " + response.data + " from " + response.source);
} catch (Exception e) {
System.out.println("Error: " + e.getMessage());
}
}
}
Advanced Patterns
Example 4: Custom StructuredTaskScope
import java.util.concurrent.StructuredTaskScope;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
public class CustomStructuredScopeDemo {
// Custom scope that collects all results
static class CollectingScope<T> extends StructuredTaskScope<T> {
private final List<Subtask<? extends T>> subtasks = new ArrayList<>();
@Override
public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
Subtask<U> subtask = super.fork(task);
subtasks.add(subtask);
return subtask;
}
public List<T> successfulResults() {
return subtasks.stream()
.filter(subtask -> subtask.state() == Subtask.State.SUCCESS)
.map(Subtask::get)
.toList();
}
public List<Throwable> failures() {
return subtasks.stream()
.filter(subtask -> subtask.state() == Subtask.State.FAILED)
.map(Subtask::exception)
.toList();
}
}
public static void processBatch(List<String> items) throws InterruptedException {
try (var scope = new CollectingScope<String>()) {
// Process all items concurrently
for (String item : items) {
scope.fork(() -> processItem(item));
}
scope.join();
// Analyze results
List<String> successes = scope.successfulResults();
List<Throwable> failures = scope.failures();
System.out.println("Successful: " + successes.size());
System.out.println("Failed: " + failures.size());
failures.forEach(failure ->
System.out.println("Failure: " + failure.getMessage()));
}
}
private static String processItem(String item) throws InterruptedException {
Thread.sleep(500);
if (item.contains("error")) {
throw new RuntimeException("Processing error for: " + item);
}
return "Processed: " + item;
}
public static void main(String[] args) {
try {
List<String> items = List.of("item1", "item2", "error-item", "item4");
processBatch(items);
} catch (Exception e) {
System.out.println("Batch processing failed: " + e.getMessage());
}
}
}
Example 5: Nested Structured Concurrency
import java.util.concurrent.StructuredTaskScope;
public class NestedStructuredConcurrency {
static class OrderResult {
final String orderInfo;
final String paymentInfo;
final String shippingInfo;
OrderResult(String orderInfo, String paymentInfo, String shippingInfo) {
this.orderInfo = orderInfo;
this.paymentInfo = paymentInfo;
this.shippingInfo = shippingInfo;
}
}
public static OrderResult processOrder(String orderId)
throws InterruptedException, ExecutionException {
try (var outerScope = new StructuredTaskScope.ShutdownOnFailure()) {
// Main order processing tasks
var orderTask = outerScope.fork(() -> fetchOrderDetails(orderId));
var paymentTask = outerScope.fork(() -> processPayment(orderId));
outerScope.join();
outerScope.throwIfFailed();
// Nested scope for shipping options
try (var innerScope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
innerScope.fork(() -> calculateStandardShipping(orderId));
innerScope.fork(() -> calculateExpressShipping(orderId));
innerScope.join();
String shippingInfo = innerScope.result();
return new OrderResult(
orderTask.get(),
paymentTask.get(),
shippingInfo
);
}
}
}
private static String fetchOrderDetails(String orderId) throws InterruptedException {
Thread.sleep(1000);
return "Order details for " + orderId;
}
private static String processPayment(String orderId) throws InterruptedException {
Thread.sleep(800);
return "Payment processed for " + orderId;
}
private static String calculateStandardShipping(String orderId) throws InterruptedException {
Thread.sleep(1200);
return "Standard shipping: $5.99";
}
private static String calculateExpressShipping(String orderId) throws InterruptedException {
Thread.sleep(600);
return "Express shipping: $12.99";
}
public static void main(String[] args) {
try {
OrderResult result = processOrder("order-123");
System.out.println("Order processed successfully:");
System.out.println(" Order: " + result.orderInfo);
System.out.println(" Payment: " + result.paymentInfo);
System.out.println(" Shipping: " + result.shippingInfo);
} catch (Exception e) {
System.out.println("Order processing failed: " + e.getMessage());
}
}
}
Integration with Virtual Threads
Example 6: Structured Concurrency with Virtual Threads
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class VirtualThreadsIntegration {
public static void highConcurrencyWithVirtualThreads() throws InterruptedException {
// Create a virtual thread factory
ThreadFactory virtualThreadFactory = Thread.ofVirtual().factory();
try (var scope = new StructuredTaskScope<String>()) {
// Fork many tasks - virtual threads handle this efficiently
for (int i = 0; i < 1000; i++) {
final int taskId = i;
scope.fork(() -> processWithVirtualThread(taskId));
}
scope.join();
System.out.println("All 1000 tasks completed");
}
}
private static String processWithVirtualThread(int taskId) throws InterruptedException {
// Simulate I/O operation
Thread.sleep(100);
return "Task " + taskId + " completed by: " + Thread.currentThread();
}
// Custom scope with virtual threads
static class VirtualThreadScope<T> extends StructuredTaskScope<T> {
private final ThreadFactory virtualThreadFactory = Thread.ofVirtual().factory();
@Override
protected <U extends T> Subtask<U> newSubtask(Future<U> future) {
// Use virtual threads for task execution
return super.newSubtask(future);
}
}
public static void main(String[] args) {
try {
highConcurrencyWithVirtualThreads();
} catch (Exception e) {
System.out.println("Error: " + e.getMessage());
}
}
}
Testing Structured Concurrency
Example 7: Testing Strategies
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.atomic.AtomicInteger;
public class StructuredConcurrencyTest {
static class CounterService {
private final AtomicInteger counter = new AtomicInteger(0);
public int incrementAndGet() throws InterruptedException {
Thread.sleep(100); // Simulate work
return counter.incrementAndGet();
}
}
public static int concurrentIncrement(CounterService service, int threads)
throws InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var tasks = new ArrayList<Subtask<Integer>>();
for (int i = 0; i < threads; i++) {
tasks.add(scope.fork(service::incrementAndGet));
}
scope.join();
scope.throwIfFailed();
// Return the final value (all increments should be visible)
return tasks.get(tasks.size() - 1).get();
}
}
// Test method
public static void testConcurrentIncrement() {
try {
CounterService service = new CounterService();
int result = concurrentIncrement(service, 10);
assert result == 10 : "Expected 10, got " + result;
System.out.println("Test passed: " + result);
} catch (Exception e) {
System.out.println("Test failed: " + e.getMessage());
}
}
public static void main(String[] args) {
testConcurrentIncrement();
}
}
Benefits and Best Practices
Key Benefits
- Automatic Resource Management: No more leaked threads
- Clear Error Propagation: Failures are properly handled and propagated
- Readable Code: Concurrency structure matches code structure
- Cancellation Support: Automatic cancellation of related tasks
- Debugging Friendly: Stack traces show the complete task hierarchy
Best Practices
- Always Use Try-with-Resources: Ensures proper scope cleanup
- Choose the Right Policy:
ShutdownOnFailurevsShutdownOnSuccess - Handle Timeouts Appropriately: Use
joinUntil()for time-bound operations - Leverage Virtual Threads: Perfect combination for I/O-bound tasks
- Keep Tasks Focused: Each forked task should have a single responsibility
Comparison with Traditional Approaches
| Aspect | Traditional Concurrency | Structured Concurrency |
|---|---|---|
| Error Handling | Manual, complex | Automatic propagation |
| Resource Cleanup | Manual cancellation required | Automatic scope management |
| Code Structure | Scattered across methods | Contained in blocks |
| Debugging | Complex stack traces | Clear task hierarchies |
| Cancellation | Manual coordination | Automatic with scope |
Conclusion
Structured Concurrency represents a fundamental shift in how we approach concurrent programming in Java. By bringing structured programming principles to concurrency, it provides:
- Simplified code that's easier to read and maintain
- Automatic resource management that prevents leaks
- Clear error handling that properly propagates failures
- Natural integration with virtual threads
While still in preview as of Java 22, Structured Concurrency shows immense promise for making concurrent programming more accessible and reliable. It's particularly valuable for:
- Microservices communication
- Parallel data processing
- API aggregation patterns
- Any scenario involving multiple concurrent operations
As Java continues to evolve, Structured Concurrency, combined with Virtual Threads, is set to revolutionize how we write concurrent applications, making them more robust, maintainable, and performant.