Introduction to Phaser
The Phaser is a flexible synchronization barrier that allows dynamic registration/deregistration of parties and supports multiple phases. It's more powerful than CyclicBarrier and CountDownLatch for complex multi-phase synchronization scenarios.
1. Basic Phaser Usage
Simple Phase Synchronization
import java.util.concurrent.Phaser;
public class BasicPhaserExample {
public static void main(String[] args) {
// Create phaser with 3 registered parties
Phaser phaser = new Phaser(3);
for (int i = 0; i < 3; i++) {
final int taskId = i;
new Thread(() -> {
System.out.printf("Task-%d: Starting phase 0%n", taskId);
// Wait for all parties to arrive
int phase = phaser.arriveAndAwaitAdvance();
System.out.printf("Task-%d: Completed phase %d%n", taskId, phase - 1);
// Phase 1 work
System.out.printf("Task-%d: Starting phase 1%n", taskId);
phase = phaser.arriveAndAwaitAdvance();
System.out.printf("Task-%d: Completed phase %d%n", taskId, phase - 1);
// Phase 2 work
System.out.printf("Task-%d: Starting phase 2%n", taskId);
phaser.arriveAndDeregister(); // Complete and deregister
System.out.printf("Task-%d: Finished all phases%n", taskId);
}).start();
}
}
}
Phaser with Dynamic Registration
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadLocalRandom;
public class DynamicRegistrationPhaser {
public static void main(String[] args) throws InterruptedException {
// Create phaser with initial 0 parties
Phaser phaser = new Phaser(1); // Main thread registers as 1st party
// Phase 0: Start workers
System.out.println("=== Phase 0: Starting workers ===");
startWorkers(phaser, 3);
// Main thread arrives and waits for workers
phaser.arriveAndAwaitAdvance();
System.out.println("All workers started. Moving to phase 1");
// Phase 1: Add more workers dynamically
System.out.println("\n=== Phase 1: Adding more workers ===");
startWorkers(phaser, 2);
phaser.arriveAndAwaitAdvance();
System.out.println("Additional workers joined. Moving to phase 2");
// Phase 2: Final work
System.out.println("\n=== Phase 2: Final work ===");
phaser.arriveAndAwaitAdvance();
// Main thread deregisters
phaser.arriveAndDeregister();
System.out.println("Main thread completed");
}
private static void startWorkers(Phaser phaser, int count) {
for (int i = 0; i < count; i++) {
final int workerId = phaser.getRegisteredParties();
new Thread(() -> {
// Register this thread with the phaser
phaser.register();
System.out.printf("Worker-%d registered. Total parties: %d%n",
workerId, phaser.getRegisteredParties());
try {
// Phase 0 work
int phase = phaser.arriveAndAwaitAdvance();
System.out.printf("Worker-%d completed phase %d%n", workerId, phase - 1);
// Phase 1 work
phase = phaser.arriveAndAwaitAdvance();
System.out.printf("Worker-%d completed phase %d%n", workerId, phase - 1);
// Phase 2 work
System.out.printf("Worker-%d finishing...%n", workerId);
phaser.arriveAndDeregister();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
try {
Thread.sleep(100); // Stagger worker creation
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
2. Advanced Phaser Patterns
Multi-Stage Processing Pipeline
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadLocalRandom;
import java.util.*;
public class ProcessingPipelinePhaser {
static class DataProcessor {
private final Phaser phaser;
private final String name;
private final int processingTime;
public DataProcessor(Phaser phaser, String name, int processingTime) {
this.phaser = phaser;
this.name = name;
this.processingTime = processingTime;
phaser.register(); // Register with phaser
}
public void process(List<String> data) {
try {
System.out.printf("%s: Starting phase %d with %d items%n",
name, phaser.getPhase(), data.size());
// Simulate processing
Thread.sleep(processingTime);
// Transform data
List<String> processed = new ArrayList<>();
for (String item : data) {
processed.add(item + "-processed-by-" + name);
}
System.out.printf("%s: Completed phase %d%n", name, phaser.getPhase());
// Wait for all processors to complete this phase
int nextPhase = phaser.arriveAndAwaitAdvance();
// Pass processed data to next phase
if (nextPhase < 3) { // Only if there are more phases
process(processed);
} else {
System.out.printf("%s: Final result: %s%n", name, processed);
phaser.arriveAndDeregister();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
phaser.arriveAndDeregister();
}
}
}
public static void main(String[] args) throws InterruptedException {
// Create phaser for 3-phase pipeline
Phaser phaser = new Phaser(1); // Main thread registered
System.out.println("=== Starting 3-Stage Processing Pipeline ===");
// Create processors for different stages
DataProcessor[] processors = {
new DataProcessor(phaser, "Validator", 100),
new DataProcessor(phaser, "Transformer", 150),
new DataProcessor(phaser, "Aggregator", 200)
};
// Initial data
List<String> initialData = Arrays.asList("data1", "data2", "data3", "data4");
// Start all processors
for (DataProcessor processor : processors) {
new Thread(() -> processor.process(new ArrayList<>(initialData))).start();
}
// Main thread advances to phase 1
System.out.println("Main: All processors started. Advancing to phase 1");
phaser.arriveAndAwaitAdvance();
// Wait for completion
while (!phaser.isTerminated()) {
int phase = phaser.getPhase();
int parties = phaser.getRegisteredParties();
System.out.printf("Main: Phase %d, Active parties: %d%n", phase, parties);
phaser.arriveAndAwaitAdvance();
}
System.out.println("Main: All processing completed");
}
}
Phaser with Termination
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class PhaserTerminationExample {
public static void main(String[] args) throws InterruptedException {
Phaser phaser = new Phaser(1) {
// Override onAdvance to control termination
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.printf("Phaser advancing: phase=%d, parties=%d%n",
phase, registeredParties);
// Terminate after 5 phases or when no parties remain
if (phase >= 4 || registeredParties == 0) {
System.out.println("Phaser terminating");
return true; // Terminate phaser
}
return false; // Continue
}
};
// Start worker threads
for (int i = 0; i < 3; i++) {
startWorker(phaser, "Worker-" + i);
}
// Main thread coordinates phases
while (!phaser.isTerminated()) {
int phase = phaser.getPhase();
System.out.printf("\n=== Main: Starting phase %d ===%n", phase);
// Simulate main thread work
Thread.sleep(200);
// Advance phase and wait for workers
int nextPhase = phaser.arriveAndAwaitAdvance();
System.out.printf("Main: Completed phase %d, moving to phase %d%n",
phase, nextPhase);
// Randomly add/remove workers to demonstrate dynamic behavior
if (phase == 1) {
System.out.println("Main: Adding new worker during phase transition");
startWorker(phaser, "Late-Worker");
}
if (phase == 2 && Math.random() > 0.5) {
System.out.println("Main: Worker finished early and deregistered");
phaser.arriveAndDeregister();
}
}
System.out.println("\nPhaser has terminated");
}
private static void startWorker(Phaser phaser, String name) {
new Thread(() -> {
phaser.register();
System.out.printf("%s registered. Total parties: %d%n",
name, phaser.getRegisteredParties());
try {
while (!phaser.isTerminated()) {
int phase = phaser.getPhase();
System.out.printf("%s: Working in phase %d%n", name, phase);
// Simulate work
Thread.sleep(100 + (long)(Math.random() * 200));
// Wait for all parties
int nextPhase = phaser.arriveAndAwaitAdvance();
System.out.printf("%s: Advanced to phase %d%n", name, nextPhase);
// Randomly decide to finish
if (Math.random() > 0.7 && phaser.getRegisteredParties() > 1) {
System.out.printf("%s: Finished work, deregistering%n", name);
phaser.arriveAndDeregister();
break;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
phaser.arriveAndDeregister();
}
}, name).start();
}
}
3. Real-World Use Cases
Distributed Task Processing
import java.util.concurrent.Phaser;
import java.util.concurrent.ConcurrentHashMap;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
public class DistributedTaskProcessing {
static class TaskManager {
private final Phaser phaser;
private final Map<String, List<String>> workerResults;
private final int totalPhases;
public TaskManager(int totalPhases) {
this.phaser = new Phaser(1) { // Main thread registered
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.printf("=== Phase %d completed by %d workers ===%n",
phase, registeredParties - 1);
// Collect and display results
displayPhaseResults(phase);
return phase >= totalPhases - 1; // Terminate after last phase
}
};
this.workerResults = new ConcurrentHashMap<>();
this.totalPhases = totalPhases;
}
public void addWorker(String workerId, List<String> tasks) {
new Thread(() -> {
phaser.register();
workerResults.put(workerId, new ArrayList<>());
System.out.printf("Worker %s joined with %d tasks%n",
workerId, tasks.size());
try {
for (int phase = 0; phase < totalPhases && !phaser.isTerminated(); phase++) {
processPhase(workerId, tasks, phase);
phaser.arriveAndAwaitAdvance();
}
// Final deregistration
phaser.arriveAndDeregister();
System.out.printf("Worker %s completed all phases%n", workerId);
} catch (Exception e) {
System.err.printf("Worker %s failed: %s%n", workerId, e.getMessage());
phaser.arriveAndDeregister();
}
}, workerId).start();
}
private void processPhase(String workerId, List<String> tasks, int phase)
throws InterruptedException {
System.out.printf("Worker %s processing phase %d%n", workerId, phase);
// Simulate phase-specific processing
List<String> phaseResults = new ArrayList<>();
for (String task : tasks) {
String result = processTask(task, phase);
phaseResults.add(result);
// Simulate variable processing time
Thread.sleep(ThreadLocalRandom.current().nextInt(50, 200));
}
// Store results
String phaseKey = String.format("Phase-%d", phase);
workerResults.computeIfAbsent(workerId, k -> new ArrayList<>())
.add(phaseKey + ": " + phaseResults);
}
private String processTask(String task, int phase) {
// Simulate different processing per phase
switch (phase) {
case 0: return task + "-validated";
case 1: return task + "-transformed";
case 2: return task + "-enriched";
case 3: return task + "-finalized";
default: return task + "-processed";
}
}
private void displayPhaseResults(int phase) {
System.out.println("\n--- Phase " + phase + " Results ---");
workerResults.forEach((workerId, results) -> {
if (results.size() > phase) {
System.out.printf("%s: %s%n", workerId, results.get(phase));
}
});
System.out.println("--- End Phase Results ---\n");
}
public void startProcessing() {
System.out.println("Starting distributed processing...");
phaser.arriveAndDeregister(); // Main thread advances to phase 0
}
public void awaitCompletion() {
while (!phaser.isTerminated()) {
phaser.arriveAndAwaitAdvance();
}
}
}
public static void main(String[] args) throws InterruptedException {
TaskManager manager = new TaskManager(4); // 4 phases
// Add workers with different tasks
manager.addWorker("Worker-NYC", Arrays.asList("task1", "task2", "task3"));
manager.addWorker("Worker-LON", Arrays.asList("taskA", "taskB"));
manager.addWorker("Worker-TOK", Arrays.asList("data1", "data2", "data3", "data4"));
// Start after a delay to show dynamic registration
Thread.sleep(1000);
manager.addWorker("Worker-DEL", Arrays.asList("late-task1", "late-task2"));
// Start processing
manager.startProcessing();
// Wait for completion
manager.awaitCompletion();
System.out.println("=== All distributed processing completed ===");
}
}
Bulkhead Pattern with Phaser
import java.util.concurrent.Phaser;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.*;
public class BulkheadPhaserPattern {
static class ServiceBulkhead {
private final Phaser phaser;
private final ExecutorService executor;
private final String serviceName;
private final int maxConcurrent;
private final AtomicInteger activeRequests = new AtomicInteger(0);
public ServiceBulkhead(String serviceName, int maxConcurrent) {
this.serviceName = serviceName;
this.maxConcurrent = maxConcurrent;
this.phaser = new Phaser(1); // Control thread
this.executor = Executors.newFixedThreadPool(maxConcurrent);
}
public CompletableFuture<String> processRequest(String requestId) {
return CompletableFuture.supplyAsync(() -> {
// Register with phaser for this request
phaser.register();
activeRequests.incrementAndGet();
try {
System.out.printf("[%s] Processing request %s (active: %d/%d)%n",
serviceName, requestId, activeRequests.get(), maxConcurrent);
// Simulate service processing
Thread.sleep(200 + (long)(Math.random() * 800));
String result = serviceName + "-result-for-" + requestId;
// Signal completion
phaser.arriveAndDeregister();
activeRequests.decrementAndGet();
return result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
phaser.arriveAndDeregister();
activeRequests.decrementAndGet();
throw new RuntimeException("Request interrupted", e);
}
}, executor);
}
public void awaitCurrentRequests() {
System.out.printf("[%s] Waiting for %d active requests to complete%n",
serviceName, activeRequests.get());
phaser.arriveAndAwaitAdvance();
}
public void shutdown() {
awaitCurrentRequests();
executor.shutdown();
System.out.printf("[%s] Bulkhead shutdown completed%n", serviceName);
}
}
public static void main(String[] args) throws InterruptedException {
// Create bulkheads for different services
ServiceBulkhead userService = new ServiceBulkhead("UserService", 3);
ServiceBulkhead orderService = new ServiceBulkhead("OrderService", 2);
ServiceBulkhead inventoryService = new ServiceBulkhead("InventoryService", 4);
List<CompletableFuture<String>> allFutures = new ArrayList<>();
// Simulate incoming requests
for (int i = 1; i <= 10; i++) {
final String requestId = "REQ-" + i;
// Distribute requests to different services
ServiceBulkhead targetService;
switch (i % 3) {
case 0: targetService = userService; break;
case 1: targetService = orderService; break;
default: targetService = inventoryService; break;
}
allFutures.add(targetService.processRequest(requestId));
// Add some delay between requests
if (i % 3 == 0) {
Thread.sleep(100);
}
}
System.out.println("\n=== All requests submitted ===");
// Wait for all services to complete current requests
System.out.println("\n=== Waiting for services to complete ===");
userService.awaitCurrentRequests();
orderService.awaitCurrentRequests();
inventoryService.awaitCurrentRequests();
// Collect results
System.out.println("\n=== Collecting results ===");
CompletableFuture<Void> allDone = CompletableFuture.allOf(
allFutures.toArray(new CompletableFuture[0])
);
allDone.thenRun(() -> {
allFutures.forEach(future -> {
try {
System.out.println("Result: " + future.get());
} catch (Exception e) {
System.err.println("Failed: " + e.getMessage());
}
});
}).join();
// Shutdown bulkheads
userService.shutdown();
orderService.shutdown();
inventoryService.shutdown();
System.out.println("\n=== All bulkheads shutdown ===");
}
}
4. Advanced Phaser Techniques
Hierarchical Phasers
import java.util.concurrent.Phaser;
import java.util.*;
public class HierarchicalPhasers {
static class RegionalProcessor {
private final Phaser regionalPhaser;
private final String regionName;
private final List<LocalProcessor> localProcessors;
public RegionalProcessor(String regionName, int localUnits) {
this.regionName = regionName;
this.regionalPhaser = new Phaser(1) { // Regional coordinator
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.printf("[%s] Regional phase %d completed by %d local units%n",
regionName, phase, registeredParties - 1);
return phase >= 2; // 3 phases total (0,1,2)
}
};
this.localProcessors = new ArrayList<>();
// Create local processors
for (int i = 1; i <= localUnits; i++) {
localProcessors.add(new LocalProcessor("Local-" + i));
}
}
public void startProcessing() {
System.out.printf("[%s] Starting regional processing with %d local units%n",
regionName, localProcessors.size());
// Start all local processors
for (LocalProcessor local : localProcessors) {
local.startPhase(0);
}
// Regional coordination
while (!regionalPhaser.isTerminated()) {
int phase = regionalPhaser.getPhase();
regionalPhaser.arriveAndAwaitAdvance();
if (!regionalPhaser.isTerminated()) {
System.out.printf("[%s] Advancing local units to phase %d%n",
regionName, phase + 1);
// Start next phase in all local units
for (LocalProcessor local : localProcessors) {
local.startPhase(phase + 1);
}
}
}
System.out.printf("[%s] Regional processing completed%n", regionName);
}
class LocalProcessor {
private final Phaser localPhaser;
private final String localName;
public LocalProcessor(String localName) {
this.localName = localName;
this.localPhaser = new Phaser(regionalPhaser, 0) { // Child of regional phaser
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.printf("[%s] Local phase %d completed%n", localName, phase);
return super.onAdvance(phase, registeredParties);
}
};
}
public void startPhase(int phase) {
new Thread(() -> {
try {
System.out.printf("[%s] Starting phase %d%n", localName, phase);
// Simulate phase work
Thread.sleep(100 + (long)(Math.random() * 400));
System.out.printf("[%s] Completing phase %d%n", localName, phase);
localPhaser.arriveAndDeregister();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
localPhaser.arriveAndDeregister();
}
}).start();
}
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("=== Hierarchical Phaser Example ===\n");
// Create regional processors
RegionalProcessor[] regions = {
new RegionalProcessor("North", 3),
new RegionalProcessor("South", 2),
new RegionalProcessor("East", 4),
new RegionalProcessor("West", 3)
};
// Start all regions
List<Thread> regionThreads = new ArrayList<>();
for (RegionalProcessor region : regions) {
Thread t = new Thread(region::startProcessing);
regionThreads.add(t);
t.start();
}
// Wait for all regions to complete
for (Thread t : regionThreads) {
t.join();
}
System.out.println("\n=== All regional processing completed ===");
}
}
Phaser with Timeouts and Error Handling
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.*;
public class PhaserWithTimeouts {
static class RobustTaskProcessor {
private final Phaser phaser;
private final long phaseTimeoutMs;
private final Set<String> failedTasks;
public RobustTaskProcessor(int parties, long phaseTimeoutMs) {
this.phaser = new Phaser(parties) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.printf("Phase %d completed. Remaining parties: %d%n",
phase, registeredParties);
return phase >= 2 || registeredParties == 0; // 3 phases or no parties
}
};
this.phaseTimeoutMs = phaseTimeoutMs;
this.failedTasks = Collections.synchronizedSet(new HashSet<>());
}
public void executeTask(String taskId) {
new Thread(() -> {
try {
System.out.printf("Task %s starting%n", taskId);
// Phase 0: Initialization
awaitPhaseAdvance(taskId, 0);
if (simulateFailure()) {
throw new RuntimeException("Phase 0 failure");
}
// Phase 1: Processing
awaitPhaseAdvance(taskId, 1);
if (simulateFailure()) {
throw new RuntimeException("Phase 1 failure");
}
// Phase 2: Finalization
awaitPhaseAdvance(taskId, 2);
System.out.printf("Task %s completed successfully%n", taskId);
phaser.arriveAndDeregister();
} catch (Exception e) {
System.err.printf("Task %s failed: %s%n", taskId, e.getMessage());
failedTasks.add(taskId);
phaser.arriveAndDeregister();
}
}, taskId).start();
}
private void awaitPhaseAdvance(String taskId, int expectedPhase)
throws InterruptedException, TimeoutException {
int currentPhase = phaser.getPhase();
if (currentPhase != expectedPhase) {
System.out.printf("Task %s: Unexpected phase %d, expected %d%n",
taskId, currentPhase, expectedPhase);
return;
}
try {
// Wait for phase advance with timeout
int nextPhase = phaser.awaitAdvanceInterruptibly(currentPhase,
phaseTimeoutMs, TimeUnit.MILLISECONDS);
System.out.printf("Task %s: Advanced from phase %d to %d%n",
taskId, currentPhase, nextPhase);
} catch (TimeoutException e) {
System.err.printf("Task %s: Phase %d timeout after %d ms%n",
taskId, currentPhase, phaseTimeoutMs);
throw e;
}
}
private boolean simulateFailure() {
return Math.random() < 0.2; // 20% failure rate
}
public void monitorProgress() {
new Thread(() -> {
while (!phaser.isTerminated()) {
try {
Thread.sleep(500);
int phase = phaser.getPhase();
int parties = phaser.getRegisteredParties();
int unarrived = phaser.getUnarrivedParties();
System.out.printf("Monitor: Phase=%d, Total=%d, Unarrived=%d, Failed=%d%n",
phase, parties, unarrived, failedTasks.size());
// Force advance if stuck (for demonstration)
if (unarrived > 0 && System.currentTimeMillis() % 10 == 0) {
System.out.println("Monitor: Forcing phase advance");
phaser.forceTermination();
}
} catch (Exception e) {
System.err.println("Monitor error: " + e.getMessage());
}
}
System.out.println("Monitor: Phaser terminated");
}).start();
}
public Set<String> getFailedTasks() {
return new HashSet<>(failedTasks);
}
}
public static void main(String[] args) throws InterruptedException {
RobustTaskProcessor processor = new RobustTaskProcessor(5, 2000); // 2s timeout
// Start monitoring
processor.monitorProgress();
// Execute tasks
for (int i = 1; i <= 5; i++) {
processor.executeTask("Task-" + i);
}
// Wait for completion
while (!processor.phaser.isTerminated()) {
Thread.sleep(1000);
}
// Report results
Set<String> failed = processor.getFailedTasks();
System.out.println("\n=== Processing Complete ===");
System.out.println("Failed tasks: " + failed);
System.out.println("Success rate: " +
((5 - failed.size()) * 100.0 / 5) + "%");
}
}
5. Performance Monitoring
Phaser Performance Metrics
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.*;
import java.util.*;
public class PhaserPerformanceMetrics {
static class MetricsCollector {
private final AtomicLong totalPhases = new AtomicLong();
private final AtomicLong totalArrivals = new AtomicLong();
private final AtomicLong timeoutCount = new AtomicLong();
private final AtomicLong errorCount = new AtomicLong();
private final Map<Integer, AtomicLong> phaseDurations = new ConcurrentHashMap<>();
public void recordPhaseCompletion(int phase, long duration) {
totalPhases.incrementAndGet();
phaseDurations.computeIfAbsent(phase, k -> new AtomicLong())
.addAndGet(duration);
}
public void recordArrival() {
totalArrivals.incrementAndGet();
}
public void recordTimeout() {
timeoutCount.incrementAndGet();
}
public void recordError() {
errorCount.incrementAndGet();
}
public void printMetrics() {
System.out.println("\n=== Phaser Performance Metrics ===");
System.out.printf("Total phases completed: %d%n", totalPhases.get());
System.out.printf("Total arrivals: %d%n", totalArrivals.get());
System.out.printf("Timeouts: %d%n", timeoutCount.get());
System.out.printf("Errors: %d%n", errorCount.get());
System.out.println("\nPhase Durations (ms):");
phaseDurations.forEach((phase, duration) -> {
long avg = duration.get() / Math.max(1, totalPhases.get());
System.out.printf(" Phase %d: avg %d ms%n", phase, avg);
});
}
}
public static void main(String[] args) throws InterruptedException {
MetricsCollector metrics = new MetricsCollector();
int numParties = 10;
int numPhases = 5;
Phaser phaser = new Phaser(numParties) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.printf("Phase %d completed by %d parties%n",
phase, registeredParties);
return phase >= numPhases - 1;
}
};
// Start worker threads
List<Thread> workers = new ArrayList<>();
for (int i = 0; i < numParties; i++) {
final int workerId = i;
Thread worker = new Thread(() -> {
try {
for (int phase = 0; phase < numPhases && !phaser.isTerminated(); phase++) {
long startTime = System.currentTimeMillis();
// Simulate work
Thread.sleep(50 + (long)(Math.random() * 100));
metrics.recordArrival();
// Wait for phase advance
int nextPhase = phaser.arriveAndAwaitAdvance();
long duration = System.currentTimeMillis() - startTime;
metrics.recordPhaseCompletion(phase, duration);
System.out.printf("Worker %d completed phase %d in %d ms%n",
workerId, phase, duration);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
metrics.recordError();
}
});
workers.add(worker);
worker.start();
}
// Wait for all workers to complete
for (Thread worker : workers) {
worker.join();
}
// Print metrics
metrics.printMetrics();
System.out.println("\n=== Benchmark Comparison ===");
benchmarkPhaserVsAlternatives();
}
private static void benchmarkPhaserVsAlternatives() throws InterruptedException {
int numTasks = 1000;
int numPhases = 3;
System.out.println("Benchmarking Phaser vs CountDownLatch...");
// Phaser benchmark
long phaserStart = System.currentTimeMillis();
benchmarkPhaser(numTasks, numPhases);
long phaserTime = System.currentTimeMillis() - phaserStart;
// CountDownLatch benchmark (for comparison)
long latchStart = System.currentTimeMillis();
benchmarkCountDownLatch(numTasks, numPhases);
long latchTime = System.currentTimeMillis() - latchStart;
System.out.printf("Phaser time: %d ms%n", phaserTime);
System.out.printf("CountDownLatch time: %d ms%n", latchTime);
System.out.printf("Phaser is %.2fx %s%n",
(double) latchTime / phaserTime,
phaserTime < latchTime ? "faster" : "slower");
}
private static void benchmarkPhaser(int numTasks, int numPhases)
throws InterruptedException {
Phaser phaser = new Phaser(numTasks);
for (int i = 0; i < numTasks; i++) {
new Thread(() -> {
for (int phase = 0; phase < numPhases; phase++) {
phaser.arriveAndAwaitAdvance();
}
}).start();
}
while (phaser.getPhase() < numPhases) {
Thread.sleep(10);
}
}
private static void benchmarkCountDownLatch(int numTasks, int numPhases)
throws InterruptedException {
for (int phase = 0; phase < numPhases; phase++) {
CountDownLatch latch = new CountDownLatch(numTasks);
for (int i = 0; i < numTasks; i++) {
new Thread(() -> {
latch.countDown();
}).start();
}
latch.await();
}
}
}
Summary
Key Benefits of Phaser:
- Dynamic Registration: Parties can register/deregister at runtime
- Multiple Phases: Supports multi-stage synchronization
- Flexible Termination: Custom termination conditions via
onAdvance() - Hierarchical Support: Can create tree structures of phasers
- Timeout Support: Methods with timeout parameters available
Common Use Cases:
- Multi-phase processing pipelines
- Dynamic worker coordination
- Distributed task processing
- Bulkhead pattern implementation
- Complex synchronization scenarios
Best Practices:
- Always use try-finally for arrive/deregister calls
- Override onAdvance() for custom termination logic
- Use hierarchical phasers for large-scale systems
- Implement timeout handling for robust applications
- Monitor phaser state in production systems
Performance Characteristics:
- Low overhead for phase synchronization
- Scalable to large numbers of parties
- Efficient for multi-phase operations compared to multiple CountDownLatches
- Flexible for dynamic workloads
Phaser provides the most flexible synchronization barrier in Java's concurrency utilities, making it ideal for complex multi-phase coordination scenarios where the number of participants may change dynamically.