Structured Concurrency with Scoped Values in Java

Structured Concurrency and Scoped Values are two powerful features introduced in recent Java versions (as preview features) that work together to make concurrent programming safer, more readable, and more maintainable.

1. Structured Concurrency Basics

What is Structured Concurrency?

  • Structured Concurrency treats groups of related tasks as single units of work
  • Child tasks complete before their parent task
  • Prevents thread leakage and makes error handling easier
  • Provides better observability and debugging

Key Components

  • StructuredTaskScope - Manages a scope of concurrent subtasks
  • ShutdownOnSuccess - Shuts down when any subtask completes successfully
  • ShutdownOnFailure - Shuts down when any subtask fails
  • Scoped Values - Immutable, inheritable thread-local values

2. Basic Structured Concurrency

Simple StructuredTaskScope Example

import java.util.concurrent.*;
import java.util.random.RandomGenerator;
public class BasicStructuredConcurrency {
public static void main(String[] args) throws Exception {
String result = fetchUserData();
System.out.println("Final result: " + result);
}
public static String fetchUserData() throws Exception {
try (var scope = new StructuredTaskScope<String>()) {
// Fork concurrent subtasks
Subtask<String> userTask = scope.fork(() -> fetchUserFromDB());
Subtask<String> profileTask = scope.fork(() -> fetchUserProfile());
Subtask<String> preferencesTask = scope.fork(() -> fetchUserPreferences());
// Join all subtasks - waits for all to complete or scope to shut down
scope.join();
// Handle results
String user = userTask.get();
String profile = profileTask.get();
String preferences = preferencesTask.get();
return String.format("User: %s, Profile: %s, Preferences: %s", 
user, profile, preferences);
}
}
private static String fetchUserFromDB() throws InterruptedException {
Thread.sleep(RandomGenerator.getDefault().nextInt(100, 500));
return "John Doe";
}
private static String fetchUserProfile() throws InterruptedException {
Thread.sleep(RandomGenerator.getDefault().nextInt(100, 500));
return "Premium User";
}
private static String fetchUserPreferences() throws InterruptedException {
Thread.sleep(RandomGenerator.getDefault().nextInt(100, 500));
return "Dark Mode, Notifications Enabled";
}
}

ShutdownOnSuccess Pattern

import java.util.concurrent.*;
public class ShutdownOnSuccessExample {
public static void main(String[] args) throws Exception {
String fastestResult = findFastestService();
System.out.println("Fastest service response: " + fastestResult);
}
public static String findFastestService() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
// Fork multiple service calls - first one to complete wins
scope.fork(() -> callServiceA());
scope.fork(() -> callServiceB());
scope.fork(() -> callServiceC());
scope.fork(() -> callServiceD());
// Wait for any task to complete successfully
scope.join();
// Get the first successful result
return scope.result();
}
}
private static String callServiceA() throws InterruptedException {
Thread.sleep(200 + RandomGenerator.getDefault().nextInt(500));
return "Service A Response";
}
private static String callServiceB() throws InterruptedException {
Thread.sleep(100 + RandomGenerator.getDefault().nextInt(300));
return "Service B Response";
}
private static String callServiceC() throws InterruptedException {
Thread.sleep(300 + RandomGenerator.getDefault().nextInt(400));
return "Service C Response";
}
private static String callServiceD() throws InterruptedException {
Thread.sleep(150 + RandomGenerator.getDefault().nextInt(600));
// Simulate occasional failure
if (RandomGenerator.getDefault().nextDouble() < 0.3) {
throw new RuntimeException("Service D temporarily unavailable");
}
return "Service D Response";
}
}

ShutdownOnFailure Pattern

import java.util.concurrent.*;
public class ShutdownOnFailureExample {
public static void main(String[] args) throws Exception {
try {
ProcessedData result = processDataCompletely();
System.out.println("Success: " + result);
} catch (Exception e) {
System.out.println("Processing failed: " + e.getMessage());
}
}
public static ProcessedData processDataCompletely() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Fork multiple processing steps
Subtask<ValidationResult> validationTask = 
scope.fork(() -> validateInputData());
Subtask<TransformedData> transformationTask = 
scope.fork(() -> transformData());
Subtask<EnrichedData> enrichmentTask = 
scope.fork(() -> enrichData());
// Wait for all tasks or shut down on first failure
scope.join();
scope.throwIfFailed(); // Throws if any subtask failed
// All tasks completed successfully
ValidationResult validation = validationTask.get();
TransformedData transformation = transformationTask.get();
EnrichedData enrichment = enrichmentTask.get();
return new ProcessedData(validation, transformation, enrichment);
}
}
private static ValidationResult validateInputData() throws InterruptedException {
Thread.sleep(100);
// Simulate validation failure
if (RandomGenerator.getDefault().nextDouble() < 0.1) {
throw new ValidationException("Invalid input data format");
}
return new ValidationResult(true, "Validation passed");
}
private static TransformedData transformData() throws InterruptedException {
Thread.sleep(200);
return new TransformedData("Transformed data");
}
private static EnrichedData enrichData() throws InterruptedException {
Thread.sleep(150);
return new EnrichedData("Enriched with additional info");
}
// Record classes for data types
record ValidationResult(boolean valid, String message) {}
record TransformedData(String data) {}
record EnrichedData(String data) {}
record ProcessedData(ValidationResult validation, 
TransformedData transformation, 
EnrichedData enrichment) {}
static class ValidationException extends RuntimeException {
public ValidationException(String message) {
super(message);
}
}
}

3. Scoped Values

Basic Scoped Values Usage

import java.util.concurrent.*;
public class ScopedValuesExample {
// Define scoped values - these are immutable and inheritable
private static final ScopedValue<String> CURRENT_USER = ScopedValue.newInstance();
private static final ScopedValue<Locale> USER_LOCALE = ScopedValue.newInstance();
private static final ScopedValue<Map<String, String>> REQUEST_CONTEXT = ScopedValue.newInstance();
public static void main(String[] args) throws Exception {
// Bind scoped values for the duration of the operation
ScopedValue.where(CURRENT_USER, "alice")
.where(USER_LOCALE, Locale.US)
.where(REQUEST_CONTEXT, Map.of("requestId", "req-123", "sessionId", "sess-456"))
.run(() -> processUserRequest());
}
private static void processUserRequest() {
System.out.println("Processing request for user: " + CURRENT_USER.get());
System.out.println("User locale: " + USER_LOCALE.get());
System.out.println("Request context: " + REQUEST_CONTEXT.get());
// The scoped values are automatically available to all called methods
performUserOperation();
auditUserAction();
}
private static void performUserOperation() {
// No need to pass context - it's automatically available
String user = CURRENT_USER.get();
Locale locale = USER_LOCALE.get();
System.out.println("Performing operation for " + user + " in " + locale);
}
private static void auditUserAction() {
String user = CURRENT_USER.get();
Map<String, String> context = REQUEST_CONTEXT.get();
System.out.println("Auditing action by " + user + " with context: " + context);
}
}

Scoped Values with Structured Concurrency

import java.util.concurrent.*;
public class ScopedValuesWithConcurrency {
private static final ScopedValue<String> REQUEST_ID = ScopedValue.newInstance();
private static final ScopedValue<UserContext> USER_CONTEXT = ScopedValue.newInstance();
private static final ScopedValue<Logger> REQUEST_LOGGER = ScopedValue.newInstance();
public static void main(String[] args) throws Exception {
UserContext context = new UserContext("user123", "premium", "en-US");
Logger logger = new SimpleLogger();
ScopedValue.where(REQUEST_ID, "req-" + System.currentTimeMillis())
.where(USER_CONTEXT, context)
.where(REQUEST_LOGGER, logger)
.run(() -> handleComplexRequest());
}
private static void handleComplexRequest() throws Exception {
String requestId = REQUEST_ID.get();
UserContext userContext = USER_CONTEXT.get();
Logger logger = REQUEST_LOGGER.get();
logger.log("Starting complex request: " + requestId + " for user: " + userContext.userId());
try (var scope = new StructuredTaskScope<String>()) {
// All forked tasks inherit the scoped values automatically
Subtask<String> dataTask = scope.fork(() -> fetchUserData());
Subtask<String> permissionsTask = scope.fork(() -> checkPermissions());
Subtask<String> cacheTask = scope.fork(() -> checkCache());
scope.join();
String data = dataTask.get();
String permissions = permissionsTask.get();
String cache = cacheTask.get();
logger.log("Request completed: " + requestId);
System.out.println("Results - Data: " + data + ", Permissions: " + permissions + ", Cache: " + cache);
}
}
private static String fetchUserData() {
UserContext context = USER_CONTEXT.get();
Logger logger = REQUEST_LOGGER.get();
logger.log("Fetching data for user: " + context.userId());
simulateWork(100);
return "User data for " + context.userId();
}
private static String checkPermissions() {
UserContext context = USER_CONTEXT.get();
Logger logger = REQUEST_LOGGER.get();
logger.log("Checking permissions for user: " + context.userId() + ", tier: " + context.tier());
simulateWork(150);
return context.tier().equals("premium") ? "Full access" : "Limited access";
}
private static String checkCache() {
String requestId = REQUEST_ID.get();
Logger logger = REQUEST_LOGGER.get();
logger.log("Checking cache for request: " + requestId);
simulateWork(50);
return "Cache miss for " + requestId;
}
private static void simulateWork(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// Supporting classes
record UserContext(String userId, String tier, String locale) {}
static class SimpleLogger {
public void log(String message) {
System.out.println("[LOG][" + Thread.currentThread().getName() + "] " + message);
}
}
}

4. Real-World Use Cases

Use Case 1: Web Request Handler

import java.util.concurrent.*;
import java.util.*;
public class WebRequestHandler {
private static final ScopedValue<String> REQUEST_ID = ScopedValue.newInstance();
private static final ScopedValue<UserSession> USER_SESSION = ScopedValue.newInstance();
private static final ScopedValue<RequestMetrics> METRICS = ScopedValue.newInstance();
public static void main(String[] args) throws Exception {
// Simulate incoming web requests
for (int i = 1; i <= 3; i++) {
String requestId = "req-" + i;
UserSession session = new UserSession("user" + i, System.currentTimeMillis());
RequestMetrics metrics = new RequestMetrics();
ScopedValue.where(REQUEST_ID, requestId)
.where(USER_SESSION, session)
.where(METRICS, metrics)
.run(() -> handleHttpRequest());
System.out.println("Request " + requestId + " completed. Metrics: " + metrics);
}
}
private static void handleHttpRequest() {
try {
String requestId = REQUEST_ID.get();
UserSession session = USER_SESSION.get();
RequestMetrics metrics = METRICS.get();
metrics.startPhase("authentication");
authenticateUser();
metrics.startPhase("authorization");
checkPermissions();
metrics.startPhase("data_processing");
processRequestData();
metrics.startPhase("response_generation");
generateResponse();
metrics.markSuccess();
} catch (Exception e) {
METRICS.get().markFailure(e.getMessage());
throw e;
}
}
private static void authenticateUser() {
UserSession session = USER_SESSION.get();
RequestMetrics metrics = METRICS.get();
metrics.addCheckpoint("auth_start");
simulateWork(50);
// Simulate authentication logic
if (session.userId().endsWith("2")) {
throw new SecurityException("User not authenticated");
}
metrics.addCheckpoint("auth_complete");
System.out.println("Authenticated user: " + session.userId());
}
private static void checkPermissions() {
UserSession session = USER_SESSION.get();
simulateWork(30);
System.out.println("Checked permissions for: " + session.userId());
}
private static void processRequestData() throws Exception {
String requestId = REQUEST_ID.get();
RequestMetrics metrics = METRICS.get();
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
metrics.startPhase("parallel_processing");
Subtask<List<String>> userDataTask = scope.fork(() -> fetchUserData());
Subtask<Map<String, Object>> preferencesTask = scope.fork(() -> fetchUserPreferences());
Subtask<List<Notification>> notificationsTask = scope.fork(() -> fetchNotifications());
scope.join();
scope.throwIfFailed();
List<String> userData = userDataTask.get();
Map<String, Object> preferences = preferencesTask.get();
List<Notification> notifications = notificationsTask.get();
metrics.addCheckpoint("data_processed");
System.out.println("Processed data for request: " + requestId);
}
}
private static void generateResponse() {
simulateWork(20);
System.out.println("Generated response for: " + REQUEST_ID.get());
}
private static List<String> fetchUserData() throws InterruptedException {
String requestId = REQUEST_ID.get();
UserSession session = USER_SESSION.get();
System.out.println("Fetching user data for: " + session.userId() + " in request: " + requestId);
simulateWork(100);
return List.of("profile", "history", "settings");
}
private static Map<String, Object> fetchUserPreferences() throws InterruptedException {
UserSession session = USER_SESSION.get();
simulateWork(80);
return Map.of("theme", "dark", "language", "en", "notifications", true);
}
private static List<Notification> fetchNotifications() throws InterruptedException {
UserSession session = USER_SESSION.get();
simulateWork(120);
return List.of(
new Notification("Welcome back!", "info"),
new Notification("New feature available", "update")
);
}
private static void simulateWork(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// Supporting records
record UserSession(String userId, long loginTime) {}
record Notification(String message, String type) {}
static class RequestMetrics {
private final long startTime = System.currentTimeMillis();
private final List<String> phases = new ArrayList<>();
private final List<String> checkpoints = new ArrayList<>();
private boolean success = false;
private String error;
public void startPhase(String phase) {
phases.add(phase + ":" + System.currentTimeMillis());
}
public void addCheckpoint(String checkpoint) {
checkpoints.add(checkpoint + ":" + System.currentTimeMillis());
}
public void markSuccess() {
this.success = true;
}
public void markFailure(String error) {
this.success = false;
this.error = error;
}
@Override
public String toString() {
return String.format("Metrics{success=%s, phases=%d, checkpoints=%d, duration=%dms}",
success, phases.size(), checkpoints.size(),
System.currentTimeMillis() - startTime);
}
}
}

Use Case 2: Distributed Service Caller

import java.util.concurrent.*;
import java.util.*;
public class DistributedServiceCaller {
private static final ScopedValue<String> CORRELATION_ID = ScopedValue.newInstance();
private static final ScopedValue<CallContext> CALL_CONTEXT = ScopedValue.newInstance();
private static final ScopedValue<CircuitBreaker> CIRCUIT_BREAKER = ScopedValue.newInstance();
public static void main(String[] args) throws Exception {
CircuitBreaker circuitBreaker = new CircuitBreaker();
for (int i = 0; i < 5; i++) {
String correlationId = "corr-" + System.currentTimeMillis() + "-" + i;
CallContext context = new CallContext("service-client", "high");
try {
ScopedValue.where(CORRELATION_ID, correlationId)
.where(CALL_CONTEXT, context)
.where(CIRCUIT_BREAKER, circuitBreaker)
.run(() -> callDistributedServices());
System.out.println("Request " + correlationId + " completed successfully");
} catch (Exception e) {
System.out.println("Request " + correlationId + " failed: " + e.getMessage());
}
}
}
private static void callDistributedServices() throws Exception {
String correlationId = CORRELATION_ID.get();
CallContext context = CALL_CONTEXT.get();
CircuitBreaker circuitBreaker = CIRCUIT_BREAKER.get();
System.out.println("Starting distributed call: " + correlationId);
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Fork calls to various microservices
Subtask<ServiceResponse> userServiceTask = 
scope.fork(() -> callUserService(circuitBreaker));
Subtask<ServiceResponse> orderServiceTask = 
scope.fork(() -> callOrderService(circuitBreaker));
Subtask<ServiceResponse> inventoryServiceTask = 
scope.fork(() -> callInventoryService(circuitBreaker));
Subtask<ServiceResponse> paymentServiceTask = 
scope.fork(() -> callPaymentService(circuitBreaker));
scope.join();
scope.throwIfFailed();
// Aggregate results
ServiceResponse userResponse = userServiceTask.get();
ServiceResponse orderResponse = orderServiceTask.get();
ServiceResponse inventoryResponse = inventoryServiceTask.get();
ServiceResponse paymentResponse = paymentServiceTask.get();
AggregateResponse aggregate = new AggregateResponse(
userResponse, orderResponse, inventoryResponse, paymentResponse
);
System.out.println("Aggregated response for " + correlationId + ": " + aggregate);
}
}
private static ServiceResponse callUserService(CircuitBreaker circuitBreaker) throws Exception {
if (!circuitBreaker.canCall("user-service")) {
throw new CircuitBreakerOpenException("user-service");
}
try {
simulateServiceCall("user-service", 100, 0.1);
return new ServiceResponse("user-service", 200, "User data");
} catch (Exception e) {
circuitBreaker.recordFailure("user-service");
throw e;
}
}
private static ServiceResponse callOrderService(CircuitBreaker circuitBreaker) throws Exception {
if (!circuitBreaker.canCall("order-service")) {
throw new CircuitBreakerOpenException("order-service");
}
try {
simulateServiceCall("order-service", 150, 0.15);
return new ServiceResponse("order-service", 200, "Order data");
} catch (Exception e) {
circuitBreaker.recordFailure("order-service");
throw e;
}
}
private static ServiceResponse callInventoryService(CircuitBreaker circuitBreaker) throws Exception {
if (!circuitBreaker.canCall("inventory-service")) {
throw new CircuitBreakerOpenException("inventory-service");
}
try {
simulateServiceCall("inventory-service", 80, 0.05);
return new ServiceResponse("inventory-service", 200, "Inventory data");
} catch (Exception e) {
circuitBreaker.recordFailure("inventory-service");
throw e;
}
}
private static ServiceResponse callPaymentService(CircuitBreaker circuitBreaker) throws Exception {
if (!circuitBreaker.canCall("payment-service")) {
throw new CircuitBreakerOpenException("payment-service");
}
try {
simulateServiceCall("payment-service", 200, 0.2);
return new ServiceResponse("payment-service", 200, "Payment processed");
} catch (Exception e) {
circuitBreaker.recordFailure("payment-service");
throw e;
}
}
private static void simulateServiceCall(String serviceName, int baseDelay, double failureRate) 
throws InterruptedException {
String correlationId = CORRELATION_ID.get();
CallContext context = CALL_CONTEXT.get();
System.out.println("Calling " + serviceName + " for " + correlationId + " with priority " + context.priority());
// Simulate network delay
int delay = baseDelay + RandomGenerator.getDefault().nextInt(50);
Thread.sleep(delay);
// Simulate occasional failure
if (RandomGenerator.getDefault().nextDouble() < failureRate) {
throw new ServiceException(serviceName + " unavailable");
}
}
// Supporting classes
record CallContext(String clientId, String priority) {}
record ServiceResponse(String service, int status, String data) {}
record AggregateResponse(ServiceResponse user, ServiceResponse order, 
ServiceResponse inventory, ServiceResponse payment) {}
static class ServiceException extends RuntimeException {
public ServiceException(String message) {
super(message);
}
}
static class CircuitBreakerOpenException extends RuntimeException {
public CircuitBreakerOpenException(String service) {
super("Circuit breaker open for: " + service);
}
}
static class CircuitBreaker {
private final Map<String, Integer> failureCount = new ConcurrentHashMap<>();
private final Map<String, Long> lastFailure = new ConcurrentHashMap<>();
private static final int MAX_FAILURES = 3;
private static final long TIMEOUT_MS = 5000;
public boolean canCall(String service) {
Integer failures = failureCount.get(service);
if (failures == null || failures < MAX_FAILURES) {
return true;
}
Long lastFail = lastFailure.get(service);
if (lastFail != null && System.currentTimeMillis() - lastFail > TIMEOUT_MS) {
// Timeout expired, reset circuit breaker
failureCount.remove(service);
lastFailure.remove(service);
return true;
}
return false;
}
public void recordFailure(String service) {
failureCount.merge(service, 1, Integer::sum);
lastFailure.put(service, System.currentTimeMillis());
}
public void recordSuccess(String service) {
failureCount.remove(service);
lastFailure.remove(service);
}
}
}

5. Advanced Patterns

Nested Scoped Values

import java.util.concurrent.*;
public class NestedScopedValues {
private static final ScopedValue<String> OUTER_VALUE = ScopedValue.newInstance();
private static final ScopedValue<Integer> INNER_VALUE = ScopedValue.newInstance();
public static void main(String[] args) {
// Outer scope
ScopedValue.where(OUTER_VALUE, "outer-value")
.run(() -> {
System.out.println("Outer: " + OUTER_VALUE.get());
// Inner scope - can rebind values
ScopedValue.where(OUTER_VALUE, "rebound-outer")
.where(INNER_VALUE, 42)
.run(() -> {
System.out.println("Inner - Outer: " + OUTER_VALUE.get());
System.out.println("Inner - Inner: " + INNER_VALUE.get());
// The rebound value only exists in this scope
performNestedOperation();
});
// Back to outer scope - original value restored
System.out.println("Back to outer: " + OUTER_VALUE.get());
});
}
private static void performNestedOperation() {
System.out.println("Nested - Outer: " + OUTER_VALUE.get());
System.out.println("Nested - Inner: " + INNER_VALUE.get());
}
}

Error Handling and Propagation

import java.util.concurrent.*;
public class ErrorHandlingExample {
private static final ScopedValue<ErrorHandler> ERROR_HANDLER = ScopedValue.newInstance();
private static final ScopedValue<RecoveryStrategy> RECOVERY_STRATEGY = ScopedValue.newInstance();
public static void main(String[] args) {
ErrorHandler handler = new LoggingErrorHandler();
RecoveryStrategy strategy = new ExponentialBackoffStrategy();
try {
ScopedValue.where(ERROR_HANDLER, handler)
.where(RECOVERY_STRATEGY, strategy)
.run(() -> executeWithRetry());
} catch (Exception e) {
System.out.println("Final failure: " + e.getMessage());
}
}
private static void executeWithRetry() {
ErrorHandler handler = ERROR_HANDLER.get();
RecoveryStrategy strategy = RECOVERY_STRATEGY.get();
for (int attempt = 1; attempt <= strategy.maxAttempts(); attempt++) {
try {
System.out.println("Attempt " + attempt + " of " + strategy.maxAttempts());
performUnreliableOperation();
System.out.println("Operation succeeded on attempt " + attempt);
return;
} catch (OperationException e) {
handler.handleError(e, attempt);
if (attempt == strategy.maxAttempts()) {
throw new RuntimeException("All retry attempts failed", e);
}
try {
long backoff = strategy.calculateBackoff(attempt);
System.out.println("Backing off for " + backoff + "ms");
Thread.sleep(backoff);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Operation interrupted", ie);
}
}
}
}
private static void performUnreliableOperation() {
// Simulate unreliable operation that fails 70% of the time
if (RandomGenerator.getDefault().nextDouble() < 0.7) {
throw new OperationException("Temporary failure");
}
System.out.println("Operation completed successfully");
}
static class OperationException extends RuntimeException {
public OperationException(String message) {
super(message);
}
}
interface ErrorHandler {
void handleError(OperationException error, int attempt);
}
interface RecoveryStrategy {
int maxAttempts();
long calculateBackoff(int attempt);
}
static class LoggingErrorHandler implements ErrorHandler {
@Override
public void handleError(OperationException error, int attempt) {
System.err.println("Attempt " + attempt + " failed: " + error.getMessage());
}
}
static class ExponentialBackoffStrategy implements RecoveryStrategy {
@Override
public int maxAttempts() {
return 5;
}
@Override
public long calculateBackoff(int attempt) {
return (long) (Math.pow(2, attempt) * 100); // Exponential backoff
}
}
}

6. Testing Structured Concurrency

Unit Testing with Structured Concurrency

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
public class StructuredConcurrencyTest {
@Test
void testSuccessfulParallelExecution() throws Exception {
AtomicInteger counter = new AtomicInteger();
try (var scope = new StructuredTaskScope<Integer>()) {
Subtask<Integer> task1 = scope.fork(() -> {
counter.incrementAndGet();
return 1;
});
Subtask<Integer> task2 = scope.fork(() -> {
counter.incrementAndGet();
return 2;
});
scope.join();
assertEquals(2, counter.get());
assertEquals(1, task1.get());
assertEquals(2, task2.get());
}
}
@Test
void testShutdownOnFailure() {
assertThrows(Exception.class, () -> {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
scope.fork(() -> {
throw new RuntimeException("Task failed");
});
scope.fork(() -> {
Thread.sleep(100);
return "Success";
});
scope.join();
scope.throwIfFailed();
}
});
}
@Test
void testScopedValueInheritance() throws Exception {
final ScopedValue<String> TEST_VALUE = ScopedValue.newInstance();
AtomicReference<String> capturedValue = new AtomicReference<>();
ScopedValue.where(TEST_VALUE, "test-value")
.run(() -> {
try (var scope = new StructuredTaskScope<Void>()) {
scope.fork(() -> {
// Child thread inherits scoped value
capturedValue.set(TEST_VALUE.get());
return null;
});
scope.join();
} catch (Exception e) {
fail("Unexpected exception", e);
}
});
assertEquals("test-value", capturedValue.get());
}
}

7. Best Practices

Do's and Don'ts

public class StructuredConcurrencyBestPractices {
// ✅ DO: Use try-with-resources for StructuredTaskScope
public void goodPractice1() throws Exception {
try (var scope = new StructuredTaskScope<String>()) {
Subtask<String> task = scope.fork(() -> "result");
scope.join();
String result = task.get();
// Use result
}
}
// ❌ DON'T: Forget to join
public void badPractice1() {
try (var scope = new StructuredTaskScope<String>()) {
scope.fork(() -> "result");
// Missing scope.join() - tasks may not complete
}
}
// ✅ DO: Handle exceptions properly
public void goodPractice2() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<String> task1 = scope.fork(() -> mightFail());
Subtask<String> task2 = scope.fork(() -> mightAlsoFail());
scope.join();
scope.throwIfFailed(); // Propagate any failures
// Process results
String result1 = task1.get();
String result2 = task2.get();
}
}
// ✅ DO: Use scoped values for context propagation
public void goodPractice3() {
ScopedValue.where(/* context */)
.run(() -> {
try (var scope = new StructuredTaskScope<>()) {
// Forked tasks automatically inherit scoped values
scope.fork(() -> operationThatNeedsContext());
scope.join();
}
});
}
// ❌ DON'T: Use thread-local variables with structured concurrency
public void badPractice2() {
ThreadLocal<String> threadLocal = new ThreadLocal<>();
threadLocal.set("value");
try (var scope = new StructuredTaskScope<String>()) {
// ❌ This won't work as expected - thread locals don't propagate
scope.fork(() -> {
String value = threadLocal.get(); // Returns null!
return value;
});
scope.join();
}
}
private String mightFail() {
if (RandomGenerator.getDefault().nextBoolean()) {
throw new RuntimeException("Random failure");
}
return "success";
}
private String mightAlsoFail() {
if (RandomGenerator.getDefault().nextBoolean()) {
throw new RuntimeException("Another random failure");
}
return "success";
}
private String operationThatNeedsContext() {
return "operation result";
}
}

Summary

Structured Concurrency with Scoped Values provides:

  • StructuredTaskScope - Managed concurrent task execution
  • Shutdown Policies - Control task termination behavior
  • Scoped Values - Immutable, inheritable context propagation
  • Error Propagation - Proper exception handling across tasks
  • Resource Safety - Automatic cleanup with try-with-resources

Key benefits:

  • Prevents thread leakage and orphaned tasks
  • Makes concurrent code easier to reason about
  • Provides better observability and debugging
  • Eliminates the need for manual thread-local management
  • Ensures proper error handling and propagation

These features work together to make concurrent programming in Java safer, more maintainable, and more expressive, particularly for server applications, distributed systems, and any scenario requiring coordinated concurrent operations.

Leave a Reply

Your email address will not be published. Required fields are marked *


Macro Nepal Helper