Structured Concurrency (Preview) in Java: Simplifying Concurrent Programming

Structured Concurrency is a revolutionary approach to concurrent programming that treats groups of related tasks running in different threads as a single unit of work. Introduced as a preview feature in Java 21 (JEP 453) and further refined in Java 22, it brings structured programming principles to the world of concurrency, making concurrent code easier to write, read, and maintain.

The Problem with Traditional Concurrency

Traditional Java concurrency models using ExecutorService, Future, and manual thread management often lead to:

  • Resource leaks from forgotten threads
  • Complex error handling and propagation
  • Difficulty in reasoning about task relationships
  • Cancellation complexity

Key Concepts of Structured Concurrency

The Basic Principle

In structured concurrency, the lifetime of concurrent subtasks is confined to a single syntactic block, just like structured programming confines control flow to blocks.

// Traditional approach - scattered error handling
Future<String> future1 = executor.submit(task1);
Future<String> future2 = executor.submit(task2);
// What if task1 fails? Need to cancel task2 manually...
// Structured concurrency - automatic scope management
try (var scope = new StructuredTaskScope<String>()) {
Future<String> future1 = scope.fork(task1);
Future<String> future2 = scope.fork(task2);
scope.join();
// Both tasks complete or none - automatic cleanup
}

Core API Components

StructuredTaskScope

The main class that enables structured concurrency:

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.concurrent.StructuredTaskScope.ShutdownOnFailure;
import java.util.concurrent.StructuredTaskScope.ShutdownOnSuccess;
public class StructuredConcurrencyBasics {
// Basic StructuredTaskScope usage
public static void basicStructuredConcurrency() throws InterruptedException {
try (var scope = new StructuredTaskScope<String>()) {
Subtask<String> userTask = scope.fork(() -> fetchUserData());
Subtask<String> productTask = scope.fork(() -> fetchProductData());
scope.join(); // Wait for all tasks
// Check results
if (userTask.state() == Subtask.State.SUCCESS) {
System.out.println("User: " + userTask.get());
}
if (productTask.state() == Subtask.State.SUCCESS) {
System.out.println("Product: " + productTask.get());
}
}
// Scope automatically closes - all tasks are handled
}
private static String fetchUserData() throws InterruptedException {
Thread.sleep(1000);
return "User: John Doe";
}
private static String fetchProductData() throws InterruptedException {
Thread.sleep(1500);
return "Product: Laptop";
}
}

Practical Implementation Examples

Example 1: ShutdownOnFailure Pattern

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.ExecutionException;
public class ShutdownOnFailureDemo {
static class UserProfile {
final String userInfo;
final String preferences;
final String history;
UserProfile(String userInfo, String preferences, String history) {
this.userInfo = userInfo;
this.preferences = preferences;
this.history = history;
}
@Override
public String toString() {
return String.format("UserProfile[user=%s, pref=%s, history=%s]", 
userInfo, preferences, history);
}
}
public static UserProfile fetchUserProfile(String userId) 
throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Fork all subtasks
Subtask<String> userInfoTask = scope.fork(() -> fetchUserInfo(userId));
Subtask<String> preferencesTask = scope.fork(() -> fetchUserPreferences(userId));
Subtask<String> historyTask = scope.fork(() -> fetchUserHistory(userId));
// Wait for all tasks or any failure
scope.join();
scope.throwIfFailed(); // Throws if any task failed
// All tasks completed successfully
return new UserProfile(
userInfoTask.get(),
preferencesTask.get(),
historyTask.get()
);
}
// Automatic shutdown guaranteed
}
private static String fetchUserInfo(String userId) throws InterruptedException {
System.out.println("Fetching user info for: " + userId);
Thread.sleep(1000);
if ("error".equals(userId)) {
throw new RuntimeException("User not found");
}
return "UserInfo for " + userId;
}
private static String fetchUserPreferences(String userId) throws InterruptedException {
System.out.println("Fetching preferences for: " + userId);
Thread.sleep(1500);
return "Preferences for " + userId;
}
private static String fetchUserHistory(String userId) throws InterruptedException {
System.out.println("Fetching history for: " + userId);
Thread.sleep(2000);
return "History for " + userId;
}
public static void main(String[] args) {
try {
UserProfile profile = fetchUserProfile("user123");
System.out.println("Success: " + profile);
// Test error case
UserProfile errorProfile = fetchUserProfile("error");
} catch (Exception e) {
System.out.println("Error fetching profile: " + e.getMessage());
}
}
}

Example 2: ShutdownOnSuccess Pattern

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.ExecutionException;
public class ShutdownOnSuccessDemo {
public static String fetchFastestService(String request) 
throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
// Fork multiple service calls - first one wins
scope.fork(() -> callServiceA(request));
scope.fork(() -> callServiceB(request));
scope.fork(() -> callServiceC(request));
scope.join();
return scope.result(); // Returns the first successful result
}
}
private static String callServiceA(String request) throws InterruptedException {
System.out.println("Service A processing...");
Thread.sleep(800); // Fastest
return "Result from Service A for: " + request;
}
private static String callServiceB(String request) throws InterruptedException {
System.out.println("Service B processing...");
Thread.sleep(1200);
return "Result from Service B for: " + request;
}
private static String callServiceC(String request) throws InterruptedException {
System.out.println("Service C processing...");
Thread.sleep(2000);
return "Result from Service C for: " + request;
}
public static void main(String[] args) {
try {
String result = fetchFastestService("test-request");
System.out.println("Fastest result: " + result);
} catch (Exception e) {
System.out.println("All services failed: " + e.getMessage());
}
}
}

Example 3: Error Handling and Timeouts

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.TimeoutException;
import java.time.Duration;
public class ErrorHandlingDemo {
static class ServiceResponse {
final String data;
final String source;
final boolean success;
ServiceResponse(String data, String source, boolean success) {
this.data = data;
this.source = source;
this.success = success;
}
}
public static ServiceResponse fetchWithFallback(String request) 
throws InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<String> primaryTask = scope.fork(() -> callPrimaryService(request));
Subtask<String> fallbackTask = scope.fork(() -> callFallbackService(request));
try {
// Wait with timeout
scope.joinUntil(java.time.Instant.now().plusSeconds(3));
} catch (TimeoutException e) {
scope.shutdown();
return new ServiceResponse("Timeout", "Fallback", false);
}
// Check which tasks completed
if (primaryTask.state() == Subtask.State.SUCCESS) {
return new ServiceResponse(primaryTask.get(), "Primary", true);
} else if (fallbackTask.state() == Subtask.State.SUCCESS) {
return new ServiceResponse(fallbackTask.get(), "Fallback", true);
} else {
return new ServiceResponse("All services failed", "None", false);
}
}
}
private static String callPrimaryService(String request) throws InterruptedException {
System.out.println("Primary service called");
Thread.sleep(4000); // Will timeout
return "Primary result";
}
private static String callFallbackService(String request) throws InterruptedException {
System.out.println("Fallback service called");
Thread.sleep(2000);
return "Fallback result";
}
public static void main(String[] args) {
try {
ServiceResponse response = fetchWithFallback("test");
System.out.println("Response: " + response.data + " from " + response.source);
} catch (Exception e) {
System.out.println("Error: " + e.getMessage());
}
}
}

Advanced Patterns

Example 4: Custom StructuredTaskScope

import java.util.concurrent.StructuredTaskScope;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
public class CustomStructuredScopeDemo {
// Custom scope that collects all results
static class CollectingScope<T> extends StructuredTaskScope<T> {
private final List<Subtask<? extends T>> subtasks = new ArrayList<>();
@Override
public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
Subtask<U> subtask = super.fork(task);
subtasks.add(subtask);
return subtask;
}
public List<T> successfulResults() {
return subtasks.stream()
.filter(subtask -> subtask.state() == Subtask.State.SUCCESS)
.map(Subtask::get)
.toList();
}
public List<Throwable> failures() {
return subtasks.stream()
.filter(subtask -> subtask.state() == Subtask.State.FAILED)
.map(Subtask::exception)
.toList();
}
}
public static void processBatch(List<String> items) throws InterruptedException {
try (var scope = new CollectingScope<String>()) {
// Process all items concurrently
for (String item : items) {
scope.fork(() -> processItem(item));
}
scope.join();
// Analyze results
List<String> successes = scope.successfulResults();
List<Throwable> failures = scope.failures();
System.out.println("Successful: " + successes.size());
System.out.println("Failed: " + failures.size());
failures.forEach(failure -> 
System.out.println("Failure: " + failure.getMessage()));
}
}
private static String processItem(String item) throws InterruptedException {
Thread.sleep(500);
if (item.contains("error")) {
throw new RuntimeException("Processing error for: " + item);
}
return "Processed: " + item;
}
public static void main(String[] args) {
try {
List<String> items = List.of("item1", "item2", "error-item", "item4");
processBatch(items);
} catch (Exception e) {
System.out.println("Batch processing failed: " + e.getMessage());
}
}
}

Example 5: Nested Structured Concurrency

import java.util.concurrent.StructuredTaskScope;
public class NestedStructuredConcurrency {
static class OrderResult {
final String orderInfo;
final String paymentInfo;
final String shippingInfo;
OrderResult(String orderInfo, String paymentInfo, String shippingInfo) {
this.orderInfo = orderInfo;
this.paymentInfo = paymentInfo;
this.shippingInfo = shippingInfo;
}
}
public static OrderResult processOrder(String orderId) 
throws InterruptedException, ExecutionException {
try (var outerScope = new StructuredTaskScope.ShutdownOnFailure()) {
// Main order processing tasks
var orderTask = outerScope.fork(() -> fetchOrderDetails(orderId));
var paymentTask = outerScope.fork(() -> processPayment(orderId));
outerScope.join();
outerScope.throwIfFailed();
// Nested scope for shipping options
try (var innerScope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
innerScope.fork(() -> calculateStandardShipping(orderId));
innerScope.fork(() -> calculateExpressShipping(orderId));
innerScope.join();
String shippingInfo = innerScope.result();
return new OrderResult(
orderTask.get(),
paymentTask.get(),
shippingInfo
);
}
}
}
private static String fetchOrderDetails(String orderId) throws InterruptedException {
Thread.sleep(1000);
return "Order details for " + orderId;
}
private static String processPayment(String orderId) throws InterruptedException {
Thread.sleep(800);
return "Payment processed for " + orderId;
}
private static String calculateStandardShipping(String orderId) throws InterruptedException {
Thread.sleep(1200);
return "Standard shipping: $5.99";
}
private static String calculateExpressShipping(String orderId) throws InterruptedException {
Thread.sleep(600);
return "Express shipping: $12.99";
}
public static void main(String[] args) {
try {
OrderResult result = processOrder("order-123");
System.out.println("Order processed successfully:");
System.out.println("  Order: " + result.orderInfo);
System.out.println("  Payment: " + result.paymentInfo);
System.out.println("  Shipping: " + result.shippingInfo);
} catch (Exception e) {
System.out.println("Order processing failed: " + e.getMessage());
}
}
}

Integration with Virtual Threads

Example 6: Structured Concurrency with Virtual Threads

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class VirtualThreadsIntegration {
public static void highConcurrencyWithVirtualThreads() throws InterruptedException {
// Create a virtual thread factory
ThreadFactory virtualThreadFactory = Thread.ofVirtual().factory();
try (var scope = new StructuredTaskScope<String>()) {
// Fork many tasks - virtual threads handle this efficiently
for (int i = 0; i < 1000; i++) {
final int taskId = i;
scope.fork(() -> processWithVirtualThread(taskId));
}
scope.join();
System.out.println("All 1000 tasks completed");
}
}
private static String processWithVirtualThread(int taskId) throws InterruptedException {
// Simulate I/O operation
Thread.sleep(100);
return "Task " + taskId + " completed by: " + Thread.currentThread();
}
// Custom scope with virtual threads
static class VirtualThreadScope<T> extends StructuredTaskScope<T> {
private final ThreadFactory virtualThreadFactory = Thread.ofVirtual().factory();
@Override
protected <U extends T> Subtask<U> newSubtask(Future<U> future) {
// Use virtual threads for task execution
return super.newSubtask(future);
}
}
public static void main(String[] args) {
try {
highConcurrencyWithVirtualThreads();
} catch (Exception e) {
System.out.println("Error: " + e.getMessage());
}
}
}

Testing Structured Concurrency

Example 7: Testing Strategies

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.atomic.AtomicInteger;
public class StructuredConcurrencyTest {
static class CounterService {
private final AtomicInteger counter = new AtomicInteger(0);
public int incrementAndGet() throws InterruptedException {
Thread.sleep(100); // Simulate work
return counter.incrementAndGet();
}
}
public static int concurrentIncrement(CounterService service, int threads) 
throws InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var tasks = new ArrayList<Subtask<Integer>>();
for (int i = 0; i < threads; i++) {
tasks.add(scope.fork(service::incrementAndGet));
}
scope.join();
scope.throwIfFailed();
// Return the final value (all increments should be visible)
return tasks.get(tasks.size() - 1).get();
}
}
// Test method
public static void testConcurrentIncrement() {
try {
CounterService service = new CounterService();
int result = concurrentIncrement(service, 10);
assert result == 10 : "Expected 10, got " + result;
System.out.println("Test passed: " + result);
} catch (Exception e) {
System.out.println("Test failed: " + e.getMessage());
}
}
public static void main(String[] args) {
testConcurrentIncrement();
}
}

Benefits and Best Practices

Key Benefits

  1. Automatic Resource Management: No more leaked threads
  2. Clear Error Propagation: Failures are properly handled and propagated
  3. Readable Code: Concurrency structure matches code structure
  4. Cancellation Support: Automatic cancellation of related tasks
  5. Debugging Friendly: Stack traces show the complete task hierarchy

Best Practices

  1. Always Use Try-with-Resources: Ensures proper scope cleanup
  2. Choose the Right Policy: ShutdownOnFailure vs ShutdownOnSuccess
  3. Handle Timeouts Appropriately: Use joinUntil() for time-bound operations
  4. Leverage Virtual Threads: Perfect combination for I/O-bound tasks
  5. Keep Tasks Focused: Each forked task should have a single responsibility

Comparison with Traditional Approaches

AspectTraditional ConcurrencyStructured Concurrency
Error HandlingManual, complexAutomatic propagation
Resource CleanupManual cancellation requiredAutomatic scope management
Code StructureScattered across methodsContained in blocks
DebuggingComplex stack tracesClear task hierarchies
CancellationManual coordinationAutomatic with scope

Conclusion

Structured Concurrency represents a fundamental shift in how we approach concurrent programming in Java. By bringing structured programming principles to concurrency, it provides:

  • Simplified code that's easier to read and maintain
  • Automatic resource management that prevents leaks
  • Clear error handling that properly propagates failures
  • Natural integration with virtual threads

While still in preview as of Java 22, Structured Concurrency shows immense promise for making concurrent programming more accessible and reliable. It's particularly valuable for:

  • Microservices communication
  • Parallel data processing
  • API aggregation patterns
  • Any scenario involving multiple concurrent operations

As Java continues to evolve, Structured Concurrency, combined with Virtual Threads, is set to revolutionize how we write concurrent applications, making them more robust, maintainable, and performant.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper